KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > ndfs > NDFS


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3 package net.nutch.ndfs;
4
5 import net.nutch.io.*;
6 import net.nutch.ipc.*;
7 import net.nutch.util.*;
8
9 import java.io.*;
10 import java.net.*;
11 import java.util.*;
12 import java.util.logging.*;
13
14 /********************************************************
15  * The NDFS class holds the NDFS client and server.
16  *
17  * @author Mike Cafarella
18  ********************************************************/

19 public class NDFS implements FSConstants {
20     public static final Logger LOG = LogFormatter.getLogger("net.nutch.fs.NDFS");
21     private static final long GIGABYTE = 1024 * 1024 * 1024;
22     private static long numGigs = NutchConf.getLong("ndfs.datanode.maxgigs", 100);
23
24     //
25
// Eventually, this constant should be computed dynamically using
26
// load information
27
//
28
private static final int MAX_BLOCKS_PER_ROUNDTRIP = 3;
29
30     /**
31      * Util method to build socket addr from string
32      */

33     public static InetSocketAddress createSocketAddr(String JavaDoc s) throws IOException {
34         String JavaDoc target = s;
35         int colonIndex = target.indexOf(':');
36         String JavaDoc host = target.substring(0, colonIndex);
37         int port = Integer.parseInt(target.substring(colonIndex + 1));
38
39         return new InetSocketAddress(host, port);
40     }
41
42     /**
43      * This class isn't for the outside world
44      */

45     private NDFS() {
46     }
47
48     ////////////////////////////////////////////////////////////////
49
//
50
// NameNode
51
//
52
////////////////////////////////////////////////////////////////
53

54     /**********************************************************
55      * NameNode controls two critical tables:
56      * 1) filename->blocksequence,version
57      * 2) block->machinelist
58      *
59      * The first table is stored on disk and is very precious.
60      * The second table is rebuilt every time the NameNode comes
61      * up.
62      *
63      * @author Mike Cafarella
64      **********************************************************/

65     public static class NameNode extends net.nutch.ipc.Server {
66         FSNamesystem namesystem;
67
68         /**
69          * Create a NameNode at the specified location
70          */

71         public NameNode(File dir, int port) throws IOException {
72             super(port, FSParam.class, 10);
73             this.namesystem = new FSNamesystem(dir);
74         }
75
76         /**
77          * This method implements the call invoked by client.
78          * Return a Writable as the function return value.
79          */

80         public Writable call(Writable param) throws IOException {
81             FSParam p = (FSParam) param;
82             FSResults r = null;
83
84             switch (p.op) {
85             //
86
// Client methods
87
//
88
case OP_CLIENT_OPEN: {
89                 UTF8 src = (UTF8) p.first;
90                 Object JavaDoc results[] = namesystem.open(src);
91                 if (results != null) {
92                     Block blocks[] = (Block[]) results[0];
93                     DatanodeInfo sets[][] = (DatanodeInfo[][]) results[1];
94                     r = new FSResults(OP_CLIENT_OPEN_ACK, new ArrayWritable(Block.class, blocks), new TwoDArrayWritable(DatanodeInfo.class, sets));
95                 } else {
96                     r = new FSResults(OP_FAILURE);
97                 }
98                 break;
99             }
100             case OP_CLIENT_STARTFILE: {
101                 UTF8 nameParams[] = (UTF8[]) ((ArrayWritable) p.first).toArray();
102                 UTF8 src = nameParams[0];
103                 UTF8 clientName = nameParams[1];
104                 boolean overwrite = ((BooleanWritable) p.second).get();
105                 Object JavaDoc results[] = namesystem.startFile(src, clientName, overwrite);
106                 if (results != null) {
107                     Block b = (Block) results[0];
108                     DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
109                     r = new FSResults(OP_CLIENT_STARTFILE_ACK, b, new ArrayWritable(DatanodeInfo.class, targets));
110                 } else {
111                     r = new FSResults(OP_FAILURE);
112                 }
113                 break;
114             }
115             case OP_CLIENT_ADDBLOCK: {
116                 UTF8 src = (UTF8) p.first;
117                 Object JavaDoc results[] = namesystem.getAdditionalBlock(src);
118                 if (results != null && results[0] == null) {
119                     try {
120                         Thread.sleep(50);
121                     } catch (InterruptedException JavaDoc ie) {
122                     }
123                     results = namesystem.getAdditionalBlock(src);
124                 }
125
126                 if (results != null) {
127                     if (results[0] != null) {
128                         Block b = (Block) results[0];
129                         DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
130                         r = new FSResults(OP_CLIENT_ADDBLOCK_ACK, b, new ArrayWritable(DatanodeInfo.class, targets));
131                     } else {
132                         r = new FSResults(OP_CLIENT_TRYAGAIN);
133                     }
134                 } else {
135                     r = new FSResults(OP_FAILURE);
136                 }
137                 break;
138             }
139             case OP_CLIENT_ABANDONBLOCK: {
140                 Block b = (Block) p.first;
141                 UTF8 src = (UTF8) p.second;
142                 boolean success = namesystem.abandonBlock(b, src);
143                 if (success) {
144                     r = new FSResults(OP_CLIENT_ABANDONBLOCK_ACK);
145                 } else {
146                     r = new FSResults(OP_FAILURE);
147                 }
148                 break;
149             }
150             case OP_CLIENT_COMPLETEFILE: {
151                 UTF8 nameParams[] = (UTF8[]) ((ArrayWritable) p.first).toArray();
152                 UTF8 src = nameParams[0];
153                 UTF8 clientName = nameParams[1];
154                 int returnCode = namesystem.completeFile(src, clientName);
155                 if (returnCode == COMPLETE_SUCCESS) {
156                     r = new FSResults(OP_CLIENT_COMPLETEFILE_ACK);
157                 } else if (returnCode == STILL_WAITING) {
158                     r = new FSResults(OP_CLIENT_TRYAGAIN);
159                 } else {
160                     r = new FSResults(OP_FAILURE);
161                 }
162                 break;
163             }
164             case OP_CLIENT_RENAMETO: {
165                 UTF8 src = (UTF8) p.first;
166                 UTF8 dst = (UTF8) p.second;
167                 boolean success = namesystem.renameTo(src, dst);
168                 if (success) {
169                     r = new FSResults(OP_CLIENT_RENAMETO_ACK);
170                 } else {
171                     r = new FSResults(OP_FAILURE);
172                 }
173                 break;
174             }
175             case OP_CLIENT_DELETE: {
176                 UTF8 src = (UTF8) p.first;
177                 boolean success = namesystem.delete(src);
178                 if (success) {
179                     r = new FSResults(OP_CLIENT_DELETE_ACK);
180                 } else {
181                     r = new FSResults(OP_FAILURE);
182                 }
183                 break;
184             }
185             case OP_CLIENT_EXISTS: {
186                 UTF8 src = (UTF8) p.first;
187                 boolean success = namesystem.exists(src);
188                 if (success) {
189                     r = new FSResults(OP_CLIENT_EXISTS_ACK);
190                 } else {
191                     r = new FSResults(OP_FAILURE);
192                 }
193                 break;
194             }
195             case OP_CLIENT_ISDIR: {
196                 UTF8 src = (UTF8) p.first;
197                 boolean success = namesystem.isDir(src);
198                 if (success) {
199                     r = new FSResults(OP_CLIENT_ISDIR_ACK);
200                 } else {
201                     r = new FSResults(OP_FAILURE);
202                 }
203                 break;
204             }
205             case OP_CLIENT_MKDIRS: {
206                 UTF8 src = (UTF8) p.first;
207                 boolean success = namesystem.mkdirs(src);
208                 if (success) {
209                     r = new FSResults(OP_CLIENT_MKDIRS_ACK);
210                 } else {
211                     r = new FSResults(OP_FAILURE);
212                 }
213                 break;
214             }
215             case OP_CLIENT_OBTAINLOCK: {
216                 UTF8 nameParams[] = (UTF8[]) ((ArrayWritable) p.first).toArray();
217                 UTF8 src = nameParams[0];
218                 UTF8 clientName = nameParams[1];
219                 boolean exclusive = ((BooleanWritable) p.second).get();
220                 int returnCode = namesystem.obtainLock(src, clientName, exclusive);
221                 if (returnCode == COMPLETE_SUCCESS) {
222                     r = new FSResults(OP_CLIENT_OBTAINLOCK_ACK);
223                 } else if (returnCode == STILL_WAITING) {
224                     r = new FSResults(OP_CLIENT_TRYAGAIN);
225                 } else {
226                     r = new FSResults(OP_FAILURE);
227                 }
228                 break;
229             }
230             case OP_CLIENT_RELEASELOCK: {
231                 UTF8 nameParams[] = (UTF8[]) ((ArrayWritable) p.first).toArray();
232                 UTF8 src = nameParams[0];
233                 UTF8 clientName = nameParams[1];
234                 int returnCode = namesystem.releaseLock(src, clientName);
235                 if (returnCode == COMPLETE_SUCCESS) {
236                     r = new FSResults(OP_CLIENT_COMPLETEFILE_ACK);
237                 } else if (returnCode == STILL_WAITING) {
238                     r = new FSResults(OP_CLIENT_TRYAGAIN);
239                 } else {
240                     r = new FSResults(OP_FAILURE);
241                 }
242                 break;
243             }
244             case OP_CLIENT_RENEW_LEASE: {
245                 UTF8 clientName = (UTF8) p.first;
246                 namesystem.renewLease(clientName);
247                 r = new FSResults(OP_CLIENT_RENEW_LEASE_ACK);
248                 break;
249             }
250             case OP_CLIENT_LISTING: {
251                 UTF8 src = (UTF8) p.first;
252                 NDFSFileInfo results[] = namesystem.getListing(src);
253                 if (results != null) {
254                     r = new FSResults(OP_CLIENT_LISTING_ACK, new ArrayWritable(NDFSFileInfo.class, results));
255                 } else {
256                     r = new FSResults(OP_FAILURE);
257                 }
258                 break;
259             }
260             case OP_CLIENT_RAWSTATS: {
261                 long totalRaw = namesystem.totalCapacity();
262                 long remainingRaw = namesystem.totalRemaining();
263                 LongWritable results[] = new LongWritable[2];
264                 results[0] = new LongWritable(totalRaw);
265                 results[1] = new LongWritable(totalRaw - remainingRaw);
266                 r = new FSResults(OP_CLIENT_RAWSTATS_ACK, new ArrayWritable(LongWritable.class, results));
267                 break;
268             }
269             case OP_CLIENT_DATANODEREPORT: {
270                 DatanodeInfo report[] = namesystem.datanodeReport();
271                 if (report != null) {
272                     r = new FSResults(OP_CLIENT_DATANODEREPORT_ACK, new ArrayWritable(DatanodeInfo.class, report));
273                 } else {
274                     r = new FSResults(OP_FAILURE);
275                 }
276                 break;
277             }
278
279             //
280
// Datanode methods
281
//
282
case OP_HEARTBEAT:
283             case OP_BLOCKREPORT:
284             case OP_BLOCKRECEIVED:
285             case OP_ERROR: {
286                 UTF8 sender = null;
287                 if (p.op == OP_HEARTBEAT) {
288                     // Receive heartbeat, update last-seen info
289
HeartbeatData hd = (HeartbeatData) p.first;
290                     sender = hd.getName();
291                     namesystem.gotHeartbeat(sender, hd.getCapacity(), hd.getRemaining());
292
293                 } else if (p.op == OP_BLOCKREPORT) {
294                     // Receive report on blocks stored at datanode
295
Block blocks[] = (Block[]) ((ArrayWritable) p.first).toArray();
296                     sender = (UTF8) p.second;
297                     namesystem.processReport(blocks, sender);
298
299                 } else if (p.op == OP_BLOCKRECEIVED) {
300                     // Receive info on block that's just been received by datanode
301
Writable blocks[] = ((ArrayWritable) p.first).get();
302                     sender = (UTF8) p.second;
303                     for (int i = 0; i < blocks.length; i++) {
304                         namesystem.blockReceived((Block) blocks[i], sender);
305                     }
306                 } else {
307                     // Got an error report from datanode
308
System.err.println("ERR from datanode! Op = " + p.op);
309                     sender = (UTF8) p.first;
310                     System.err.println("Datanode: " + sender);
311                     System.err.println("Message: " + ((UTF8) p.second));
312                 }
313
314                 //
315
// Compute return message
316
//
317
Object JavaDoc xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(sender), MAX_BLOCKS_PER_ROUNDTRIP);
318                 if (xferResults != null) {
319                     r = new FSResults(OP_TRANSFERBLOCKS, new ArrayWritable(Block.class, (Block[]) xferResults[0]), new TwoDArrayWritable(DatanodeInfo.class, (DatanodeInfo[][]) xferResults[1]));
320                 } else {
321                     Block blocks[] = namesystem.recentlyInvalidBlocks(sender);
322                     if (blocks == null) {
323                         blocks = namesystem.checkObsoleteBlocks(sender);
324                     }
325                     if (blocks != null) {
326                         r = new FSResults(OP_INVALIDATE_BLOCKS, new ArrayWritable(Block.class, blocks));
327                     } else {
328                         r = new FSResults(OP_ACK);
329                     }
330                 }
331                 break;
332             }
333             default:
334                 throw new RuntimeException JavaDoc("Unknown op code: " + p.op);
335             }
336             return r;
337         }
338
339         /**
340          */

341         public static void main(String JavaDoc argv[]) throws IOException, InterruptedException JavaDoc {
342             if (argv.length < 2) {
343                 System.out.println("NDFS$NameNode <port> <namespace_dir>");
344                 System.exit(-1);
345             }
346
347             int port = Integer.parseInt(argv[0]);
348             File dir = new File(argv[1]);
349
350             NameNode namenode = new NameNode(dir, port);
351             namenode.start();
352             namenode.join();
353         }
354     }
355
356     ////////////////////////////////////////////////////////////////
357
//
358
// DataNode
359
//
360
////////////////////////////////////////////////////////////////
361

362     /**********************************************************
363      * DataNode controls just one critical table:
364      * block-> BLOCK_SIZE stream of bytes
365      *
366      * This info is stored on disk (the NameNode is responsible for
367      * asking other machines to replicate the data). The DataNode
368      * reports the table's contents to the NameNode upon startup
369      * and every so often afterwards.
370      *
371      * @author Mike Cafarella
372      **********************************************************/

373     public static class DataNode extends net.nutch.ipc.Client {
374         FSDataset data;
375         String JavaDoc localName;
376         InetSocketAddress nameNodeAddr;
377
378         Vector receivedBlockList = new Vector();
379         /**
380          * Needs a directory to find its data (and config info)
381          */

382         public DataNode(String JavaDoc machineName, File dir, InetSocketAddress nameNodeAddr) throws IOException {
383             super(FSResults.class);
384             long capacity = numGigs * GIGABYTE;
385             this.data = new FSDataset(dir, capacity);
386             this.nameNodeAddr = nameNodeAddr;
387
388             ServerSocket ss = null;
389             int tmpPort = 7000;
390             while (ss == null) {
391                 try {
392                     ss = new ServerSocket(tmpPort);
393                     LOG.info("Opened server at " + tmpPort);
394                 } catch (IOException ie) {
395                     LOG.info("Could not open server at " + tmpPort + ", trying new port");
396                     tmpPort++;
397                 }
398             }
399             this.localName = machineName + ":" + tmpPort;
400             new Thread JavaDoc(new DataXceiveServer(ss)).start();
401         }
402
403         /**
404          * Main loop for the DataNode. Runs until shutdown.
405          */

406         public void offerService() throws Exception JavaDoc {
407             long wakeups = 0;
408             long lastHeartbeat = 0, lastBlockReport = 0;
409             long sendStart = System.currentTimeMillis();
410             int heartbeatsSent = 0;
411
412             //
413
// Now loop for a long time....
414
//
415
boolean shouldRun = true;
416             while (shouldRun) {
417                 long now = System.currentTimeMillis();
418
419                 //
420
// Every so often, send heartbeat or block-report
421
//
422
FSParam p = null;
423                 FSResults r = null;
424
425                 synchronized (receivedBlockList) {
426                     if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
427                         //
428
// All heartbeat messages include following info:
429
// -- Datanode name
430
// -- data transfer port
431
// -- Total capacity
432
// -- Bytes remaining
433
//
434
p = new FSParam(OP_HEARTBEAT, new HeartbeatData(localName, data.getCapacity(), data.getRemaining()));
435                         lastHeartbeat = now;
436                     } else if (now - lastBlockReport > BLOCKREPORT_INTERVAL) {
437                         //
438
// Send latest blockinfo report if timer has expired
439
//
440
p = new FSParam(OP_BLOCKREPORT, new ArrayWritable(Block.class, data.getBlockReport()), new UTF8(localName));
441                         lastBlockReport = now;
442                     } else if (receivedBlockList.size() > 0) {
443                         //
444
// Send newly-received blockids to namenode
445
//
446
Block blockArray[] = null;
447                         blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
448                         receivedBlockList.removeAllElements();
449                         p = new FSParam(OP_BLOCKRECEIVED, new ArrayWritable(Block.class, blockArray), new UTF8(localName));
450                     } else {
451                         //
452
// Nothing to do; sleep (until time elapses, or work arrives)
453
// and continue work
454
//
455
long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
456                         if (waitTime > 0) {
457                             try {
458                                 receivedBlockList.wait(waitTime);
459                             } catch (InterruptedException JavaDoc ie) {
460                             }
461                         }
462                         continue;
463                     }
464                 }
465
466                 //
467
// Invoke remote call
468
//
469
r = (FSResults) call(p, nameNodeAddr);
470                 if (r == null) {
471                     throw new IOException("No response to remote call to " + nameNodeAddr);
472                 }
473
474                 //
475
// Respond to namenode's reply
476
//
477
// REMIND - mjc - Right now the Datanode can only get
478
// requests from the Namenode via a response to HEARTBEAT
479
// or BLOCKREPORT. That's not so hot, as it means the
480
// Namenode will have to wait for HEARTBEAT_INTERVAL before
481
// it can make a request on the Datanode.
482
//
483
// The advantage of this system is that it's very simple.
484
// We'll fix it later on.
485
//
486
switch (r.op) {
487                 case OP_ACK: {
488                     //
489
// The nameserver just acked our call, and didn't
490
// want anything else.
491
//
492
break;
493                 }
494                 case OP_TRANSFERBLOCKS: {
495                     //
496
// Send a copy of the indicated block to another
497
// datanode
498
//
499
Block blocks[] = (Block[]) ((ArrayWritable) r.first).toArray();
500                     DatanodeInfo xferTargets[][] = (DatanodeInfo[][]) ((TwoDArrayWritable) r.second).toArray();
501                     for (int i = 0; i < blocks.length; i++) {
502                         if (!data.isValidBlock(blocks[i])) {
503                             System.out.println("Invoking error! " + localName);
504                             call(new FSParam(OP_ERROR, new UTF8(localName), new UTF8("Can't send invalid block " + blocks[i])), nameNodeAddr);
505                             break;
506                         } else {
507                             if (xferTargets[i].length > 0) {
508                                 LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
509                                 new Thread JavaDoc(new DataTransfer(xferTargets[i], blocks[i])).start();
510                             }
511                         }
512                     }
513                     break;
514                 }
515                 case OP_INVALIDATE_BLOCKS: {
516                     //
517
// Some local block(s) are obsolete and can be
518
// safely garbage-collected.
519
//
520
ArrayWritable aw = (ArrayWritable) r.first;
521                     Block b[] = (Block[]) aw.toArray();
522                     data.invalidate(b);
523                     break;
524                 }
525                 default:
526                     throw new RuntimeException JavaDoc("Unknown op code: " + r.op);
527                 }
528             }
529         }
530
531         /**
532          * Server used for receiving/sending a block of data
533          */

534         class DataXceiveServer implements Runnable JavaDoc {
535             ServerSocket ss;
536             public DataXceiveServer(ServerSocket ss) {
537                 this.ss = ss;
538             }
539
540             /**
541              */

542             public void run() {
543                 try {
544                     while (true) {
545                         Socket s = ss.accept();
546                         new Thread JavaDoc(new DataXceiver(s)).start();
547                     }
548                 } catch (IOException ie) {
549                     LOG.info("Exiting DataXceiveServer due to " + ie.toString());
550                 }
551             }
552         }
553
554         /**
555          * Thread for processing incoming/outgoing data stream
556          */

557         class DataXceiver implements Runnable JavaDoc {
558             Socket s;
559             public DataXceiver(Socket s) {
560                 this.s = s;
561             }
562
563             /**
564              */

565             public void run() {
566                 try {
567                     DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
568                     try {
569                         byte op = (byte) in.read();
570                         if (op == OP_WRITE_BLOCK) {
571                             //
572
// Read in the header
573
//
574
DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
575                             try {
576                                 Block b = new Block();
577                                 b.readFields(in);
578                                 int numTargets = in.readInt();
579                                 if (numTargets <= 0) {
580                                     throw new IOException("Mislabelled incoming datastream.");
581                                 }
582                                 DatanodeInfo targets[] = new DatanodeInfo[numTargets];
583                                 for (int i = 0; i < targets.length; i++) {
584                                     DatanodeInfo tmp = new DatanodeInfo();
585                                     tmp.readFields(in);
586                                     targets[i] = tmp;
587                                 }
588                                 byte encodingType = (byte) in.read();
589                                 long len = in.readLong();
590
591                                 //
592
// Make sure curTarget is equal to this machine
593
// REMIND - mjc
594
//
595
DatanodeInfo curTarget = targets[0];
596
597                                 //
598
// Open local disk out
599
//
600
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));
601                                 InetSocketAddress mirrorTarget = null;
602                                 try {
603                                     //
604
// Open network conn to backup machine, if
605
// appropriate
606
//
607
DataInputStream in2 = null;
608                                     DataOutputStream out2 = null;
609                                     if (targets.length > 1) {
610                                         // Connect to backup machine
611
mirrorTarget = createSocketAddr(targets[1].getName().toString());
612                                         try {
613                                             Socket s = new Socket(mirrorTarget.getAddress(), mirrorTarget.getPort());
614                                             out2 = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
615                                             in2 = new DataInputStream(new BufferedInputStream(s.getInputStream()));
616
617                                             // Write connection header
618
out2.write(OP_WRITE_BLOCK);
619                                             b.write(out2);
620                                             out2.writeInt(targets.length - 1);
621                                             for (int i = 1; i < targets.length; i++) {
622                                                 targets[i].write(out2);
623                                             }
624                                             out2.write(encodingType);
625                                             out2.writeLong(len);
626                                         } catch (IOException ie) {
627                                             if (out2 != null) {
628                                                 try {
629                                                     out2.close();
630                                                     in2.close();
631                                                 } catch (IOException out2close) {
632                                                 } finally {
633                                                     out2 = null;
634                                                     in2 = null;
635                                                 }
636                                             }
637                                         }
638                                     }
639
640                                     //
641
// Process incoming data, copy to disk and
642
// maybe to network.
643
//
644
try {
645                                         boolean anotherChunk = true;
646                                         byte buf[] = new byte[2048];
647
648                                         while (anotherChunk) {
649                                             while (len > 0) {
650                                                 int bytesRead = in.read(buf, 0, Math.min(buf.length, (int) len));
651                                                 if (bytesRead >= 0) {
652                                                     out.write(buf, 0, bytesRead);
653                                                     if (out2 != null) {
654                                                         try {
655                                                             out2.write(buf, 0, bytesRead);
656                                                         } catch (IOException out2e) {
657                                                             //
658
// If stream-copy fails, continue
659
// writing to disk. We shouldn't
660
// interrupt client write.
661
//
662
try {
663                                                                 out2.close();
664                                                                 in2.close();
665                                                             } catch (IOException out2close) {
666                                                             } finally {
667                                                                 out2 = null;
668                                                                 in2 = null;
669                                                             }
670                                                         }
671                                                     }
672                                                 }
673                                                 len -= bytesRead;
674                                             }
675
676                                             if (encodingType == RUNLENGTH_ENCODING) {
677                                                 anotherChunk = false;
678                                             } else if (encodingType == CHUNKED_ENCODING) {
679                                                 len = in.readLong();
680                                                 if (out2 != null) {
681                                                     out2.writeLong(len);
682                                                 }
683                                                 if (len == 0) {
684                                                     anotherChunk = false;
685                                                 }
686                                             }
687                                         }
688
689                                         if (out2 == null) {
690                                             LOG.info("Received block " + b + " from " + s.getInetAddress());
691                                         } else {
692                                             out2.flush();
693                                             long complete = in2.readLong();
694                                             if (complete != WRITE_COMPLETE) {
695                                                 LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
696                                             }
697                                             LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);
698                                         }
699                                     } finally {
700                                         if (out2 != null) {
701                                             out2.close();
702                                             in2.close();
703                                         }
704                                     }
705                                 } finally {
706                                     out.close();
707                                 }
708                                 data.finalizeBlock(b);
709
710                                 //
711
// Tell the namenode that we've received this block
712
// in full.
713
//
714
synchronized (receivedBlockList) {
715                                     receivedBlockList.add(b);
716                                     receivedBlockList.notifyAll();
717                                 }
718
719                                 //
720
// Tell client job is done
721
//
722
reply.writeLong(WRITE_COMPLETE);
723                             } finally {
724                                 reply.close();
725                             }
726                         } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {
727                             //
728
// Read in the header
729
//
730
Block b = new Block();
731                             b.readFields(in);
732
733                             long toSkip = 0;
734                             if (op == OP_READSKIP_BLOCK) {
735                                 toSkip = in.readLong();
736                             }
737
738                             //
739
// Open reply stream
740
//
741
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
742                             try {
743                                 //
744
// Write filelen of -1 if error
745
//
746
if (! data.isValidBlock(b)) {
747                                     out.writeLong(-1);
748                                 } else {
749                                     //
750
// Get blockdata from disk
751
//
752
long len = data.getLength(b);
753                                     DataInputStream in2 = new DataInputStream(data.getBlockData(b));
754                                     out.writeLong(len);
755
756                                     if (op == OP_READSKIP_BLOCK) {
757                                         if (toSkip > len) {
758                                             toSkip = len;
759                                         }
760                                         long amtSkipped = in2.skip(toSkip);
761                                         out.writeLong(amtSkipped);
762                                     }
763
764                                     byte buf[] = new byte[4096];
765                                     try {
766                                         int bytesRead = in2.read(buf);
767                                         while (bytesRead >= 0) {
768                                             out.write(buf, 0, bytesRead);
769                                             len -= bytesRead;
770                                             bytesRead = in2.read(buf);
771                                         }
772                                     } catch (SocketException se) {
773                                         // This might be because the reader
774
// closed the stream early
775
} finally {
776                                         in2.close();
777                                     }
778                                 }
779                                 LOG.info("Served block " + b + " to " + s.getInetAddress());
780                             } finally {
781                                 out.close();
782                             }
783                         } else {
784                             while (op >= 0) {
785                                 System.out.println("Faulty op: " + op);
786                                 op = (byte) in.read();
787                             }
788                             throw new IOException("Unknown opcode for incoming data stream");
789                         }
790                     } finally {
791                         in.close();
792                     }
793                 } catch (IOException ie) {
794                     ie.printStackTrace();
795                 } finally {
796                     try {
797                         s.close();
798                     } catch (IOException ie2) {
799                     }
800                 }
801             }
802         }
803
804         /**
805          * Used for transferring a block of data
806          */

807         class DataTransfer implements Runnable JavaDoc {
808             InetSocketAddress curTarget;
809             DatanodeInfo targets[];
810             Block b;
811             byte buf[];
812
813             /**
814              * Connect to the first item in the target list. Pass along the
815              * entire target list, the block, and the data.
816              */

817             public DataTransfer(DatanodeInfo targets[], Block b) throws IOException {
818                 this.curTarget = createSocketAddr(targets[0].getName().toString());
819                 this.targets = targets;
820                 this.b = b;
821                 this.buf = new byte[2048];
822             }
823
824             /**
825              * Do the deed, write the bytes
826              */

827             public void run() {
828                 try {
829                     Socket s = new Socket(curTarget.getAddress(), curTarget.getPort());
830                     DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
831                     try {
832                         long filelen = data.getLength(b);
833                         DataInputStream in = new DataInputStream(new BufferedInputStream(data.getBlockData(b)));
834                         try {
835                             //
836
// Header info
837
//
838
out.write(OP_WRITE_BLOCK);
839                             b.write(out);
840                             out.writeInt(targets.length);
841                             for (int i = 0; i < targets.length; i++) {
842                                 targets[i].write(out);
843                             }
844                             out.write(RUNLENGTH_ENCODING);
845                             out.writeLong(filelen);
846
847                             //
848
// Write the data
849
//
850
while (filelen > 0) {
851                                 int bytesRead = in.read(buf, 0, (int) Math.min(filelen, buf.length));
852                                 out.write(buf, 0, bytesRead);
853                                 filelen -= bytesRead;
854                             }
855                         } finally {
856                             in.close();
857                         }
858                     } finally {
859                         out.close();
860                     }
861                     LOG.info("Replicated block " + b + " to " + curTarget);
862                 } catch (IOException ie) {
863                 }
864             }
865         }
866
867         /**
868          */

869         public static void main(String JavaDoc argv[]) throws IOException {
870             if (argv.length < 3) {
871                 System.out.println("NDFS$DataNode <dataDir> <localMachine> <namenode:port>");
872                 System.exit(-1);
873             }
874             
875             File dir = new File(argv[0]);
876             String JavaDoc localMachine = argv[1];
877             String JavaDoc nameNodeStr = argv[2];
878             int colon = nameNodeStr.indexOf(":");
879             if (colon < 0) {
880                 System.out.println("Incorrect <nameserver:port> param");
881                 System.exit(-1);
882             }
883             String JavaDoc nameNodeName = nameNodeStr.substring(0, colon);
884             int nameNodePort = Integer.parseInt(nameNodeStr.substring(colon + 1));
885             InetSocketAddress nameNodeAddr = new InetSocketAddress(nameNodeName, nameNodePort);
886
887             DataNode datanode = new DataNode(localMachine, dir, nameNodeAddr);
888             while (true) {
889                 try {
890                     datanode.offerService();
891                 } catch (Exception JavaDoc ex) {
892                     LOG.info("Lost connection to namenode [" + nameNodeAddr + "]. Retrying...");
893                     try {
894                         Thread.sleep(5000);
895                     } catch (InterruptedException JavaDoc ie) {
896                     }
897                 }
898             }
899         }
900     }
901 }
902
Popular Tags