KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > ndfs > FSNamesystem


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3 package net.nutch.ndfs;
4
5 import net.nutch.io.*;
6 import net.nutch.util.*;
7
8 import java.io.*;
9 import java.util.*;
10 import java.util.logging.*;
11
12 /***************************************************
13  * The FSNamesystem tracks several important tables.
14  *
15  * 1) valid fsname --> blocklist (kept on disk, logged)
16  * 2) Set of all valid blocks (inverted #1)
17  * 3) block --> machinelist (kept in memory, rebuilt dynamically from reports)
18  * 4) machine --> blocklist (inverted #2)
19  * 5) LRU cache of updated-heartbeat machines
20  ***************************************************/

21 public class FSNamesystem implements FSConstants {
22     public static final Logger LOG = LogFormatter.getLogger("net.nutch.fs.FSNamesystem");
23
24     // DESIRED_REPLICATION is how many copies we try to have at all times
25
final static int DESIRED_REPLICATION = 2;
26
27     // MIN_REPLICATION is how many copies we need in place or else we disallow the write
28
final static int MIN_REPLICATION = 1;
29
30     // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
31
final static long HEARTBEAT_RECHECK = 1000;
32
33     //
34
// Stores the correct file name hierarchy
35
//
36
FSDirectory dir;
37
38     //
39
// Stores the block-->datanode(s) map. Updated only in response
40
// to client-sent information.
41
//
42
TreeMap blocksMap = new TreeMap();
43
44     //
45
// Stores the datanode-->block map. Done by storing a
46
// set of datanode info objects, sorted by name. Updated only in
47
// response to client-sent information.
48
//
49
TreeMap datanodeMap = new TreeMap();
50
51     //
52
// Keeps a Vector for every named machine. The Vector contains
53
// blocks that have recently been invalidated and are thought to live
54
// on the machine in question.
55
//
56
TreeMap recentInvalidateSets = new TreeMap();
57
58     //
59
// Keeps track of files that are being created, plus the
60
// blocks that make them up.
61
//
62
TreeMap pendingCreates = new TreeMap();
63
64     //
65
// Keeps track of the blocks that are part of those pending creates
66
//
67
TreeSet pendingCreateBlocks = new TreeSet();
68
69     //
70
// Stats on overall usage
71
//
72
long totalCapacity = 0, totalRemaining = 0;
73
74     //
75
Random r = new Random();
76
77     //
78
// Stores a set of datanode info objects, sorted by heartbeat
79
//
80
TreeSet heartbeats = new TreeSet(new Comparator() {
81         public int compare(Object JavaDoc o1, Object JavaDoc o2) {
82             DatanodeInfo d1 = (DatanodeInfo) o1;
83             DatanodeInfo d2 = (DatanodeInfo) o2;
84             long lu1 = d1.lastUpdate();
85             long lu2 = d2.lastUpdate();
86             if (lu1 < lu2) {
87                 return -1;
88             } else if (lu1 > lu2) {
89                 return 1;
90             } else {
91                 return d1.getName().compareTo(d2.getName());
92             }
93         }
94     });
95
96     //
97
// Store set of Blocks that need to be replicated 1 or more times.
98
// We also store pending replication-orders.
99
//
100
TreeSet neededReplications = new TreeSet();
101     TreeSet pendingReplications = new TreeSet();
102
103     //
104
// Used for handling lock-leases
105
//
106
TreeMap leases = new TreeMap();
107     TreeSet sortedLeases = new TreeSet();
108
109     //
110
// Threaded object that checks to see if we have been
111
// getting heartbeats from all clients.
112
//
113
HeartbeatMonitor hbmon = null;
114     LeaseMonitor lmon = null;
115     Thread JavaDoc hbthread = null, lmthread = null;
116     boolean fsRunning = true;
117     long systemStart = 0;
118     
119     /**
120      * dir is where the filesystem directory state
121      * is stored
122      */

123     public FSNamesystem(File dir) throws IOException {
124         this.dir = new FSDirectory(dir);
125         this.hbthread = new Thread JavaDoc(new HeartbeatMonitor());
126         this.lmthread = new Thread JavaDoc(new LeaseMonitor());
127         hbthread.start();
128         lmthread.start();
129         this.systemStart = System.currentTimeMillis();
130     }
131
132     /**
133      */

134     public void close() {
135         fsRunning = false;
136         try {
137             hbthread.join();
138         } catch (InterruptedException JavaDoc ie) {
139         }
140         try {
141             lmthread.join();
142         } catch (InterruptedException JavaDoc ie) {
143         }
144     }
145
146     /////////////////////////////////////////////////////////
147
//
148
// These methods are called by NutchFS clients
149
//
150
/////////////////////////////////////////////////////////
151
/**
152      * The client wants to open the given filename. Return a
153      * list of (block,machineArray) pairs. The sequence of unique blocks
154      * in the list indicates all the blocks that make up the filename.
155      *
156      * The client should choose one of the machines from the machineArray
157      * at random.
158      */

159     public Object JavaDoc[] open(UTF8 src) {
160         Object JavaDoc results[] = null;
161         Block blocks[] = dir.getFile(src);
162         if (blocks != null) {
163             results = new Object JavaDoc[2];
164             DatanodeInfo machineSets[][] = new DatanodeInfo[blocks.length][];
165
166             for (int i = 0; i < blocks.length; i++) {
167                 TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]);
168                 if (containingNodes == null) {
169                     machineSets[i] = new DatanodeInfo[0];
170                 } else {
171                     machineSets[i] = new DatanodeInfo[containingNodes.size()];
172                     int j = 0;
173                     for (Iterator it = containingNodes.iterator(); it.hasNext(); j++) {
174                         machineSets[i][j] = (DatanodeInfo) it.next();
175                     }
176                 }
177             }
178
179             results[0] = blocks;
180             results[1] = machineSets;
181         }
182         return results;
183     }
184
185     /**
186      * The client would like to create a new block for the indicated
187      * filename. Return an array that consists of the block, plus a set
188      * of machines. The first on this list should be where the client
189      * writes data. Subsequent items in the list must be provided in
190      * the connection to the first datanode.
191      */

192     public synchronized Object JavaDoc[] startFile(UTF8 src, UTF8 holder, boolean overwrite) {
193         Object JavaDoc results[] = null;
194         if (pendingCreates.get(src) == null) {
195             boolean fileValid = dir.isValidToCreate(src);
196             if (overwrite && ! fileValid) {
197                 delete(src);
198                 fileValid = true;
199             }
200
201             if (fileValid) {
202                 results = new Object JavaDoc[2];
203
204                 // Get the array of replication targets
205
DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null);
206                 if (targets.length < MIN_REPLICATION) {
207                     return null;
208                 }
209
210                 // Reserve space for this pending file
211
pendingCreates.put(src, new Vector());
212                 synchronized (leases) {
213                     Lease lease = (Lease) leases.get(holder);
214                     if (lease == null) {
215                         lease = new Lease(holder);
216                         leases.put(holder, lease);
217                         sortedLeases.add(lease);
218                     } else {
219                         sortedLeases.remove(lease);
220                         lease.renew();
221                         sortedLeases.add(lease);
222                     }
223                     lease.startedCreate(src);
224                 }
225
226                 // Create next block
227
results[0] = allocateBlock(src);
228                 results[1] = targets;
229             }
230         }
231         return results;
232     }
233
234     /**
235      * The client would like to obtain an additional block for the indicated
236      * filename (which is being written-to). Return an array that consists
237      * of the block, plus a set of machines. The first on this list should
238      * be where the client writes data. Subsequent items in the list must
239      * be provided in the connection to the first datanode.
240      *
241      * Make sure the previous blocks have been reported by datanodes and
242      * are replicated. Will return an empty 2-elt array if we want the
243      * client to "try again later".
244      */

245     public synchronized Object JavaDoc[] getAdditionalBlock(UTF8 src) {
246         Object JavaDoc results[] = null;
247         if (dir.getFile(src) == null && pendingCreates.get(src) != null) {
248             results = new Object JavaDoc[2];
249
250             //
251
// If we fail this, bad things happen!
252
//
253
if (checkFileProgress(src)) {
254                 // Get the array of replication targets
255
DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null);
256                 if (targets.length < MIN_REPLICATION) {
257                     return null;
258                 }
259
260                 // Create next block
261
results[0] = allocateBlock(src);
262                 results[1] = targets;
263             } else {
264                 LOG.info("File progress failure for " + src);
265                 Vector v = (Vector) pendingCreates.get(src);
266                 for (Iterator it = v.iterator(); it.hasNext(); ) {
267                     Block b = (Block) it.next();
268                     TreeSet containingNodes = (TreeSet) blocksMap.get(b);
269                     if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) {
270                         LOG.info("Problem with block " + b + ", with " + (containingNodes == null ? "0" : "" + containingNodes.size()) + " nodes reporting in.");
271                     }
272                 }
273             }
274         }
275         return results;
276     }
277
278     /**
279      * The client would like to let go of the given block
280      */

281     public synchronized boolean abandonBlock(Block b, UTF8 src) {
282         //
283
// Remove the block from the pending creates list
284
//
285
Vector pendingVector = (Vector) pendingCreates.get(src);
286         if (pendingVector != null) {
287             for (Iterator it = pendingVector.iterator(); it.hasNext(); ) {
288                 Block cur = (Block) it.next();
289                 if (cur.compareTo(b) == 0) {
290                     pendingCreateBlocks.remove(cur);
291                     it.remove();
292                     return true;
293                 }
294             }
295         }
296         return false;
297     }
298
299     /**
300      * Finalize the created file and make it world-accessible. The
301      * FSNamesystem will already know the blocks that make up the file.
302      * Before we return, we make sure that all the file's blocks have
303      * been reported by datanodes and are replicated correctly.
304      */

305     public synchronized int completeFile(UTF8 src, UTF8 holder) {
306         if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
307             return OPERATION_FAILED;
308         } else if (! checkFileProgress(src)) {
309             return STILL_WAITING;
310         } else {
311             Vector pendingVector = (Vector) pendingCreates.get(src);
312             Block pendingBlocks[] = (Block[]) pendingVector.toArray(new Block[pendingVector.size()]);
313
314             //
315
// We have the pending blocks, but they won't have
316
// length info in them (as they were allocated before
317
// data-write took place). So we need to add the correct
318
// length info to each
319
//
320
// REMIND - mjc - this is very inefficient! We should
321
// improve this!
322
//
323
for (int i = 0; i < pendingBlocks.length; i++) {
324                 Block b = pendingBlocks[i];
325                 TreeSet containingNodes = (TreeSet) blocksMap.get(b);
326                 DatanodeInfo node = (DatanodeInfo) containingNodes.first();
327                 for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {
328                     Block cur = (Block) it.next();
329                     if (b.getBlockId() == cur.getBlockId()) {
330                         b.setNumBytes(cur.getNumBytes());
331                         break;
332                     }
333                 }
334             }
335             
336             //
337
// Now we can add the (name,blocks) tuple to the filesystem
338
//
339
if (dir.addFile(src, pendingBlocks)) {
340                 // The file is no longer pending
341
pendingCreates.remove(src);
342                 for (int i = 0; i < pendingBlocks.length; i++) {
343                     pendingCreateBlocks.remove(pendingBlocks[i]);
344                 }
345
346                 synchronized (leases) {
347                     Lease lease = (Lease) leases.get(holder);
348                     if (lease != null) {
349                         lease.completedCreate(src);
350                         if (! lease.hasLocks()) {
351                             leases.remove(holder);
352                             sortedLeases.remove(lease);
353                         }
354                     }
355                 }
356
357                 // Now that the file is real, we need to be sure to replicate
358
// the blocks.
359
for (int i = 0; i < pendingBlocks.length; i++) {
360                     TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);
361                     if (containingNodes.size() < DESIRED_REPLICATION) {
362                         synchronized (neededReplications) {
363                             neededReplications.add(pendingBlocks[i]);
364                         }
365                     }
366                 }
367                 return COMPLETE_SUCCESS;
368             }
369         }
370
371         return OPERATION_FAILED;
372     }
373
374     /**
375      * Allocate a block at the given pending filename
376      */

377     synchronized Block allocateBlock(UTF8 src) {
378         Block b = new Block();
379         Vector v = (Vector) pendingCreates.get(src);
380         v.add(b);
381         pendingCreateBlocks.add(b);
382         return b;
383     }
384
385     /**
386      * Check that the indicated file's blocks are present and
387      * replicated. If not, return false.
388      */

389     synchronized boolean checkFileProgress(UTF8 src) {
390         Vector v = (Vector) pendingCreates.get(src);
391
392         for (Iterator it = v.iterator(); it.hasNext(); ) {
393             Block b = (Block) it.next();
394             TreeSet containingNodes = (TreeSet) blocksMap.get(b);
395             if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) {
396                 return false;
397             }
398         }
399         return true;
400     }
401
402     ////////////////////////////////////////////////////////////////
403
// Here's how to handle block-copy failure during client write:
404
// -- As usual, the client's write should result in a streaming
405
// backup write to a k-machine sequence.
406
// -- If one of the backup machines fails, no worries. Fail silently.
407
// -- Before client is allowed to close and finalize file, make sure
408
// that the blocks are backed up. Namenode may have to issue specific backup
409
// commands to make up for earlier datanode failures. Once all copies
410
// are made, edit namespace and return to client.
411
////////////////////////////////////////////////////////////////
412

413     /**
414      * Change the indicated filename.
415      */

416     public boolean renameTo(UTF8 src, UTF8 dst) {
417         return dir.renameTo(src, dst);
418     }
419
420     /**
421      * Remove the indicated filename from the namespace. This may
422      * invalidate some blocks that make up the file.
423      */

424     public synchronized boolean delete(UTF8 src) {
425         Block deletedBlocks[] = (Block[]) dir.delete(src);
426         if (deletedBlocks != null) {
427             for (int i = 0; i < deletedBlocks.length; i++) {
428                 Block b = deletedBlocks[i];
429
430                 TreeSet containingNodes = (TreeSet) blocksMap.get(b);
431                 if (containingNodes != null) {
432                     for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
433                         DatanodeInfo node = (DatanodeInfo) it.next();
434                         Vector invalidateSet = (Vector) recentInvalidateSets.get(node.getName());
435                         if (invalidateSet == null) {
436                             invalidateSet = new Vector();
437                             recentInvalidateSets.put(node.getName(), invalidateSet);
438                         }
439                         invalidateSet.add(b);
440                     }
441                 }
442             }
443         }
444
445         return (deletedBlocks != null);
446     }
447
448     /**
449      * Return whether the given filename exists
450      */

451     public boolean exists(UTF8 src) {
452         if (dir.getFile(src) != null || dir.isDir(src)) {
453             return true;
454         } else {
455             return false;
456         }
457     }
458
459     /**
460      * Whether the given name is a directory
461      */

462     public boolean isDir(UTF8 src) {
463         return dir.isDir(src);
464     }
465
466     /**
467      * Create all the necessary directories
468      */

469     public boolean mkdirs(UTF8 src) {
470         return dir.mkdirs(src);
471     }
472
473     /************************************************************
474      * A Lease governs all the locks held by a single client.
475      * For each client there's a corresponding lease, whose
476      * timestamp is updated when the client periodically
477      * checks in. If the client dies and allows its lease to
478      * expire, all the corresponding locks can be released.
479      *************************************************************/

480     class Lease implements Comparable JavaDoc {
481         public UTF8 holder;
482         public long lastUpdate;
483         TreeSet locks = new TreeSet();
484         TreeSet creates = new TreeSet();
485
486         public Lease(UTF8 holder) {
487             LOG.info("New lease, holder " + holder);
488             this.holder = holder;
489             renew();
490         }
491         public void renew() {
492             this.lastUpdate = System.currentTimeMillis();
493         }
494         public boolean expired() {
495             if (System.currentTimeMillis() - lastUpdate > LEASE_PERIOD) {
496                 return true;
497             } else {
498                 return false;
499             }
500         }
501         public void obtained(UTF8 src) {
502             locks.add(src);
503         }
504         public void released(UTF8 src) {
505             locks.remove(src);
506         }
507         public void startedCreate(UTF8 src) {
508             creates.add(src);
509         }
510         public void completedCreate(UTF8 src) {
511             creates.remove(src);
512         }
513         public boolean hasLocks() {
514             return (locks.size() + creates.size()) > 0;
515         }
516         public void releaseLocks() {
517             for (Iterator it = locks.iterator(); it.hasNext(); ) {
518                 UTF8 src = (UTF8) it.next();
519                 internalReleaseLock(src, holder);
520             }
521             locks.clear();
522             internalReleaseCreates(creates);
523             creates.clear();
524         }
525
526         /**
527          */

528         public String JavaDoc toString() {
529             return "[Lease. Holder: " + holder.toString() + ", heldlocks: " + locks.size() + ", pendingcreates: " + creates.size() + "]";
530         }
531
532         /**
533          */

534         public int compareTo(Object JavaDoc o) {
535             Lease l1 = (Lease) this;
536             Lease l2 = (Lease) o;
537             long lu1 = l1.lastUpdate;
538             long lu2 = l2.lastUpdate;
539             if (lu1 < lu2) {
540                 return -1;
541             } else if (lu1 > lu2) {
542                 return 1;
543             } else {
544                 return l1.holder.compareTo(l2.holder);
545             }
546         }
547     }
548     /******************************************************
549      * LeaseMonitor checks for leases that have expired,
550      * and disposes of them.
551      ******************************************************/

552     class LeaseMonitor implements Runnable JavaDoc {
553         public void run() {
554             while (fsRunning) {
555                 synchronized (FSNamesystem.this) {
556                     synchronized (leases) {
557                         Lease top;
558                         while ((sortedLeases.size() > 0) &&
559                                ((top = (Lease) sortedLeases.first()) != null)) {
560                             if (top.expired()) {
561                                 top.releaseLocks();
562                                 leases.remove(top.holder);
563                                 LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size());
564                                 if (!sortedLeases.remove(top)) {
565                                     LOG.info("Unknown failure trying to remove " + top + " from lease set.");
566                                 }
567                             } else {
568                                 break;
569                             }
570                         }
571                     }
572                 }
573                 try {
574                     Thread.sleep(2000);
575                 } catch (InterruptedException JavaDoc ie) {
576                 }
577             }
578         }
579     }
580
581     /**
582      * Get a lock (perhaps exclusive) on the given file
583      */

584     public synchronized int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) {
585         int result = dir.obtainLock(src, holder, exclusive);
586         if (result == COMPLETE_SUCCESS) {
587             synchronized (leases) {
588                 Lease lease = (Lease) leases.get(holder);
589                 if (lease == null) {
590                     lease = new Lease(holder);
591                     leases.put(holder, lease);
592                     sortedLeases.add(lease);
593                 } else {
594                     sortedLeases.remove(lease);
595                     lease.renew();
596                     sortedLeases.add(lease);
597                 }
598                 lease.obtained(src);
599             }
600         }
601         return result;
602     }
603
604     /**
605      * Release the lock on the given file
606      */

607     public synchronized int releaseLock(UTF8 src, UTF8 holder) {
608         int result = internalReleaseLock(src, holder);
609         if (result == COMPLETE_SUCCESS) {
610             synchronized (leases) {
611                 Lease lease = (Lease) leases.get(holder);
612                 if (lease != null) {
613                     lease.released(src);
614                     if (! lease.hasLocks()) {
615                         leases.remove(holder);
616                         sortedLeases.remove(lease);
617                     }
618                 }
619             }
620         }
621         return result;
622     }
623     private int internalReleaseLock(UTF8 src, UTF8 holder) {
624         return dir.releaseLock(src, holder);
625     }
626     private void internalReleaseCreates(TreeSet creates) {
627         for (Iterator it = creates.iterator(); it.hasNext(); ) {
628             UTF8 src = (UTF8) it.next();
629             Vector v = (Vector) pendingCreates.remove(src);
630             for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
631                 Block b = (Block) it2.next();
632                 pendingCreateBlocks.remove(b);
633             }
634         }
635     }
636
637     /**
638      * Renew the lease(s) held by the given client
639      */

640     public void renewLease(UTF8 holder) {
641         synchronized (leases) {
642             Lease lease = (Lease) leases.get(holder);
643             if (lease != null) {
644                 sortedLeases.remove(lease);
645                 lease.renew();
646                 sortedLeases.add(lease);
647                 LOG.info("Renewed lease " + lease);
648             }
649         }
650     }
651
652     /**
653      * Get a listing of all files at 'src'. The Object[] array
654      * exists so we can return file attributes (soon to be implemented)
655      */

656     public NDFSFileInfo[] getListing(UTF8 src) {
657         return dir.getListing(src);
658     }
659
660     /////////////////////////////////////////////////////////
661
//
662
// These methods are called by datanodes
663
//
664
/////////////////////////////////////////////////////////
665
/**
666      * The given node has reported in. This method should:
667      * 1) Record the heartbeat, so the datanode isn't timed out
668      * 2) Adjust usage stats for future block allocation
669      */

670     public void gotHeartbeat(UTF8 name, long capacity, long remaining) {
671         synchronized (heartbeats) {
672             synchronized (datanodeMap) {
673                 long capacityDiff = 0;
674                 long remainingDiff = 0;
675                 DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name);
676
677                 if (nodeinfo == null) {
678                     nodeinfo = new DatanodeInfo(name, capacity, remaining);
679                     datanodeMap.put(name, nodeinfo);
680                     capacityDiff = capacity;
681                     remainingDiff = remaining;
682                 } else {
683                     capacityDiff = capacity - nodeinfo.getCapacity();
684                     remainingDiff = remaining - nodeinfo.getRemaining();
685                     heartbeats.remove(nodeinfo);
686                     nodeinfo.updateHeartbeat(capacity, remaining);
687                 }
688                 heartbeats.add(nodeinfo);
689                 totalCapacity += capacityDiff;
690                 totalRemaining += remainingDiff;
691             }
692         }
693     }
694
695     /**
696      * Periodically calls heartbeatCheck().
697      */

698     class HeartbeatMonitor implements Runnable JavaDoc {
699         /**
700          */

701         public void run() {
702             while (fsRunning) {
703                 heartbeatCheck();
704                 try {
705                     Thread.sleep(HEARTBEAT_RECHECK);
706                 } catch (InterruptedException JavaDoc ie) {
707                 }
708             }
709         }
710     }
711
712     /**
713      * Check if there are any expired heartbeats, and if so,
714      * whether any blocks have to be re-replicated.
715      */

716     synchronized void heartbeatCheck() {
717         synchronized (heartbeats) {
718             DatanodeInfo nodeInfo = null;
719
720             while ((heartbeats.size() > 0) &&
721                    ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) &&
722                    (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {
723                 LOG.info("Lost heartbeat for " + nodeInfo.getName());
724
725                 heartbeats.remove(nodeInfo);
726                 synchronized (datanodeMap) {
727                     datanodeMap.remove(nodeInfo.getName());
728                 }
729                 totalCapacity -= nodeInfo.getCapacity();
730                 totalRemaining -= nodeInfo.getRemaining();
731
732                 Block deadblocks[] = nodeInfo.getBlocks();
733                 if (deadblocks != null) {
734                     for (int i = 0; i < deadblocks.length; i++) {
735                         removeStoredBlock(deadblocks[i], nodeInfo);
736                     }
737                 }
738
739                 if (heartbeats.size() > 0) {
740                     nodeInfo = (DatanodeInfo) heartbeats.first();
741                 }
742             }
743         }
744     }
745     
746     /**
747      * The given node is reporting all its blocks. Use this info to
748      * update the (machine-->blocklist) and (block-->machinelist) tables.
749      */

750     public synchronized void processReport(Block newReport[], UTF8 name) {
751         DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
752         if (node == null) {
753             throw new IllegalArgumentException JavaDoc("Unexpected exception. Received block report from node " + name + ", but there is no info for " + name);
754         }
755
756         //
757
// Modify the (block-->datanode) map, according to the difference
758
// between the old and new block report.
759
//
760
int oldPos = 0, newPos = 0;
761         Block oldReport[] = node.getBlocks();
762         while (oldReport != null && newReport != null && oldPos < oldReport.length && newPos < newReport.length) {
763             int cmp = oldReport[oldPos].compareTo(newReport[newPos]);
764             
765             if (cmp == 0) {
766                 // Do nothing, blocks are the same
767
oldPos++;
768                 newPos++;
769             } else if (cmp < 0) {
770                 // The old report has a block the new one does not
771
removeStoredBlock(oldReport[oldPos], node);
772                 oldPos++;
773             } else {
774                 // The new report has a block the old one does not
775
addStoredBlock(newReport[newPos], node);
776                 newPos++;
777             }
778         }
779         while (oldReport != null && oldPos < oldReport.length) {
780             // The old report has a block the new one does not
781
removeStoredBlock(oldReport[oldPos], node);
782             oldPos++;
783         }
784         while (newReport != null && newPos < newReport.length) {
785             // The new report has a block the old one does not
786
addStoredBlock(newReport[newPos], node);
787             newPos++;
788         }
789
790         //
791
// Modify node so it has the new blockreport
792
//
793
node.updateBlocks(newReport);
794     }
795
796     /**
797      * Modify (block-->datanode) map. Remove block from set of
798      * needed replications if this takes care of the problem.
799      */

800     synchronized void addStoredBlock(Block block, DatanodeInfo node) {
801         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
802         if (containingNodes == null) {
803             containingNodes = new TreeSet();
804             blocksMap.put(block, containingNodes);
805         }
806         if (! containingNodes.contains(node)) {
807             containingNodes.add(node);
808         } else {
809             LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node);
810         }
811
812         synchronized (neededReplications) {
813             if (dir.isValidBlock(block)) {
814                 if (containingNodes.size() >= DESIRED_REPLICATION) {
815                     neededReplications.remove(block);
816                     pendingReplications.remove(block);
817                 } else if (containingNodes.size() < DESIRED_REPLICATION) {
818                     if (! neededReplications.contains(block)) {
819                         neededReplications.add(block);
820                     }
821                 }
822             }
823         }
824     }
825
826     /**
827      * Modify (block-->datanode) map. Possibly generate
828      * replication tasks, if the removed block is still valid.
829      */

830     synchronized void removeStoredBlock(Block block, DatanodeInfo node) {
831         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
832         if (containingNodes == null || ! containingNodes.contains(node)) {
833             throw new IllegalArgumentException JavaDoc("No machine mapping found for block " + block + ", which should be at node " + node);
834         }
835         containingNodes.remove(node);
836
837         //
838
// It's possible that the block was removed because of a datanode
839
// failure. If the block is still valid, check if replication is
840
// necessary. In that case, put block on a possibly-will-
841
// be-replicated list.
842
//
843
if (dir.isValidBlock(block) && (containingNodes.size() < DESIRED_REPLICATION)) {
844             synchronized (neededReplications) {
845                 neededReplications.add(block);
846             }
847         }
848     }
849
850     /**
851      * The given node is reporting that it received a certain block.
852      */

853     public synchronized void blockReceived(Block block, UTF8 name) {
854         DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
855         if (node == null) {
856             throw new IllegalArgumentException JavaDoc("Unexpected exception. Got blockReceived message from node " + name + ", but there is no info for " + name);
857         }
858         //
859
// Modify the blocks->datanode map
860
//
861
addStoredBlock(block, node);
862
863         //
864
// Supplement node's blockreport
865
//
866
node.addBlock(block);
867     }
868
869     /**
870      * Total raw bytes
871      */

872     public long totalCapacity() {
873         return totalCapacity;
874     }
875
876     /**
877      * Total non-used raw bytes
878      */

879     public long totalRemaining() {
880         return totalRemaining;
881     }
882
883     /**
884      */

885     public DatanodeInfo[] datanodeReport() {
886         DatanodeInfo results[] = null;
887         synchronized (heartbeats) {
888             synchronized (datanodeMap) {
889                 results = new DatanodeInfo[datanodeMap.size()];
890                 int i = 0;
891                 for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
892                     DatanodeInfo cur = (DatanodeInfo) it.next();
893                     results[i++] = cur;
894                 }
895             }
896         }
897         return results;
898     }
899
900     /////////////////////////////////////////////////////////
901
//
902
// These methods are called by the Namenode system, to see
903
// if there is any work for a given datanode.
904
//
905
/////////////////////////////////////////////////////////
906

907     /**
908      * Return with a list of Blocks that should be invalidated
909      * at the given node. Done in response to a file delete, which
910      * eliminates a number of blocks from the universe.
911      */

912     public synchronized Block[] recentlyInvalidBlocks(UTF8 name) {
913         Vector invalidateSet = (Vector) recentInvalidateSets.remove(name);
914         if (invalidateSet == null) {
915             return null;
916         } else {
917             return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
918         }
919     }
920
921     /**
922      * If the node has not been checked in some time, go through
923      * its blocks and find which ones are neither valid nor pending.
924      * It often happens that a client will start writing blocks and
925      * then exit. The blocks are on-disk, but the file will be
926      * abandoned.
927      *
928      * It's not enough to invalidate blocks at lease expiry time;
929      * datanodes can go down before the client's lease on
930      * the failed file expires and miss the "expire" event.
931      *
932      * This function considers every block on a datanode, and thus
933      * should only be invoked infrequently.
934      */

935     public synchronized Block[] checkObsoleteBlocks(UTF8 name) {
936         DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name);
937         if (System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= OBSOLETE_INTERVAL) {
938             return null;
939         } else {
940             nodeInfo.updateObsoleteCheck();
941             Vector obsolete = new Vector();
942             for (Iterator it = nodeInfo.getBlockIterator(); it.hasNext(); ) {
943                 Block b = (Block) it.next();
944
945                 if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
946                     LOG.info("Obsoleting block " + b);
947                     obsolete.add(b);
948                 }
949             }
950             return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
951         }
952     }
953
954     /**
955      * Return with a list of Block/DataNodeInfo sets, indicating
956      * where various Blocks should be copied, ASAP.
957      *
958      * The Array that we return consists of two objects:
959      * The 1st elt is an array of Blocks.
960      * The 2nd elt is a 2D array of DatanodeInfo objs, identifying the
961      * target sequence for the Block at the appropriate index.
962      *
963      */

964     public synchronized Object JavaDoc[] pendingTransfers(DatanodeInfo srcNode, int maxXfers) {
965         //
966
// Allow the namenode to come up and hear from all datanodes before
967
// making transfers.
968
//
969
if (System.currentTimeMillis() - systemStart < SYSTEM_STARTUP_PERIOD) {
970             return null;
971         }
972
973         synchronized (neededReplications) {
974             Object JavaDoc results[] = null;
975
976             if (neededReplications.size() > 0) {
977                 //
978
// Go through all blocks that need replications. See if any
979
// are present at the current node. If so, ask the node to
980
// replicate them.
981
//
982
Vector replicateBlocks = new Vector();
983                 Vector replicateTargetSets = new Vector();
984                 for (Iterator it = neededReplications.iterator(); it.hasNext(); ) {
985                     //
986
// We can only reply with 'maxXfers' or fewer blocks
987
//
988
if (replicateBlocks.size() >= maxXfers) {
989                         break;
990                     }
991
992                     Block block = (Block) it.next();
993                     if (! dir.isValidBlock(block)) {
994                         it.remove();
995                     } else {
996                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
997
998                         if (containingNodes.contains(srcNode)) {
999                             DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION - containingNodes.size(), containingNodes);
1000                            if (targets.length > 0) {
1001                                // Build items to return
1002
replicateBlocks.add(block);
1003                                replicateTargetSets.add(targets);
1004                            }
1005                        }
1006                    }
1007                }
1008
1009                //
1010
// Move the block-replication into a "pending" state.
1011
// REMIND - mjc - the reason we use 'pending' is so we can retry
1012
// replications that fail after an appropriate amount of time.
1013
// This is not yet implemented
1014
//
1015
if (replicateBlocks.size() > 0) {
1016                    int i = 0;
1017                    for (Iterator it = replicateBlocks.iterator(); it.hasNext(); i++) {
1018                        Block block = (Block) it.next();
1019                        DatanodeInfo targets[] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
1020                        TreeSet containingNodes = (TreeSet) blocksMap.get(block);
1021
1022                        if (containingNodes.size() + targets.length >= DESIRED_REPLICATION) {
1023                            neededReplications.remove(block);
1024                            pendingReplications.add(block);
1025                        }
1026                    }
1027
1028                    //
1029
// Build returned objects from above lists
1030
//
1031
DatanodeInfo targetMatrix[][] = new DatanodeInfo[replicateTargetSets.size()][];
1032                    LOG.info("Pending transfer from " + srcNode.getName() + " to " + targetMatrix.length + " destinations");
1033                    for (i = 0; i < targetMatrix.length; i++) {
1034                        targetMatrix[i] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
1035                    }
1036
1037                    results = new Object JavaDoc[2];
1038                    results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]);
1039                    results[1] = targetMatrix;
1040                }
1041            }
1042            return results;
1043        }
1044    }
1045
1046
1047    /**
1048     * Get a certain number of targets, if possible. If not,
1049     * return as many as we can.
1050     */

1051    DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes) {
1052        TreeSet alreadyChosen = new TreeSet();
1053        Vector targets = new Vector();
1054
1055        for (int i = 0; i < desiredReplicates; i++) {
1056            DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen);
1057            if (target != null) {
1058                targets.add(target);
1059                alreadyChosen.add(target);
1060            } else {
1061                break;
1062            }
1063        }
1064        return (DatanodeInfo[]) targets.toArray(new DatanodeInfo[targets.size()]);
1065    }
1066
1067    /**
1068     * Choose a target from available machines, excepting the
1069     * given ones.
1070     *
1071     * Right now it chooses randomly from available boxes. In future could
1072     * choose according to capacity and load-balancing needs (or even
1073     * network-topology, to avoid inter-switch traffic).
1074     */

1075    DatanodeInfo chooseTarget(TreeSet alreadyHasNode, TreeSet alreadyChosen) {
1076        int totalMachines = datanodeMap.size();
1077        if (totalMachines == 0) {
1078            return null;
1079        }
1080        int freeMachines = totalMachines;
1081        for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
1082            DatanodeInfo node = (DatanodeInfo) it.next();
1083            if ((alreadyHasNode != null && alreadyHasNode.contains(node)) ||
1084                (alreadyChosen != null && alreadyChosen.contains(node))) {
1085                freeMachines--;
1086            }
1087        }
1088
1089        //
1090
// Now pick one
1091
//
1092
DatanodeInfo target = null;
1093        if (freeMachines > 0) {
1094            //
1095
// Get all possible targets
1096
//
1097
int i = 0;
1098            DatanodeInfo targetlist[] = new DatanodeInfo[totalMachines];
1099            for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); i++) {
1100                targetlist[i] = (DatanodeInfo) it.next();
1101            }
1102        
1103            do {
1104                int index = r.nextInt(totalMachines);
1105                target = targetlist[index];
1106
1107                if ((alreadyHasNode != null && alreadyHasNode.contains(target)) ||
1108                    (alreadyChosen != null && alreadyChosen.contains(target))) {
1109                    target = null;
1110                }
1111            } while (target == null);
1112        }
1113        return target;
1114
1115        /**
1116         * Choose target weighted by available storage
1117         */

1118        /**
1119        synchronized (datanodeMap) {
1120            if (datanodeMap.size() == 0) {
1121                return;
1122            }
1123
1124            long totalRemaining = 0;
1125            Vector okTargets = new Vector();
1126            for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
1127                DatanodeInfo node = (DatanodeInfo) it.next();
1128                if ((alreadyHasNode == null || ! alreadyHasNode.contains(node)) &&
1129                    (alreadyChosen == null || ! alreadyChosen.contains(node))) {
1130                    okTargets.add(node);
1131                    totalRemaining += node.getRemaining();
1132                }
1133            }
1134
1135            //
1136            // Now pick one
1137            //
1138            DatanodeInfo target = null;
1139            if (okTargets.size() > 0) {
1140                //
1141                // Repeatedly choose random byte of the total bytes free.
1142                // The machine that has that byte will be our target. Thus,
1143                // we select at random with bias toward machines with greater
1144                // available storage.
1145                //
1146                long targetByte = r.nextLong(totalRemaining);
1147                for (Iterator it = okTargets.iterator(); it.hasNext(); ) {
1148                    DatanodeInfo node = (DatanodeInfO) it.next();
1149                    targetByte -= node.getRemaining();
1150                    if (targetByte <= 0) {
1151                        target = node;
1152                        break;
1153                    }
1154                }
1155            }
1156            return target;
1157        }
1158        **/

1159    }
1160}
1161
Popular Tags