KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > db > EditSectionGroupReader


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3 package net.nutch.db;
4
5 import java.io.*;
6 import java.util.*;
7 import java.util.logging.*;
8
9 import net.nutch.io.*;
10 import net.nutch.fs.*;
11 import net.nutch.util.*;
12
13 /*********************************************************
14  * The EditSectionGroupReader will read in an edits-file that
15  * was built in a distributed way. It acts as a "meta-SequenceFile",
16  * incorporating knowledge of Section numbering as well as
17  * process-synchronization. If you had different ideas
18  * about how to make the db-edits distributed (apart from using
19  * NFS), you'd implement them here.
20  *
21  * @author Mike Cafarella
22  *********************************************************/

23 public class EditSectionGroupReader {
24     static final Logger LOG = LogFormatter.getLogger("net.nutch.db.EditSectionGroupReader");
25     private final static String JavaDoc MERGED_EDITS = "merged_edits";
26     private final static int SLEEP_INTERVAL = 3000;
27     private final static int WORRY_INTERVALS = 5;
28
29     NutchFileSystem nfs;
30     String JavaDoc label;
31     int readerNum = -1, totalMachines = -1, numEdits = 0;
32     boolean sectionComplete = false;
33
34     /**
35      * Open the EditSectionGroupReader for the appropriate file.
36      */

37     public EditSectionGroupReader(NutchFileSystem nfs, String JavaDoc label, int readerNum, int totalMachines) {
38         this.nfs = nfs;
39         this.label = label;
40         this.readerNum = readerNum;
41         this.totalMachines = totalMachines;
42     }
43
44     /**
45      * Block until all contributions to the EditSection are present
46      * and complete. To figure out how many contributors there are,
47      * we load the meta-info first (which is written at section-create
48      * time).
49      */

50     private synchronized void sectionComplete() throws IOException {
51         if (! sectionComplete) {
52             //
53
// Make sure that every contributor's file is present.
54
// When all are present, we know this section is complete.
55
//
56
for (int i = 0; i < totalMachines; i++) {
57                 // Create the files we're interested in
58
File allEditsDir = new File("editsection." + readerNum, "editsdir." + i);
59                 File editsDir = new File(allEditsDir, label);
60                 File editsList = new File(editsDir, "editslist");
61                 File editsInfo = new File(editsDir, "editsinfo");
62
63                 // Block until the editsInfo file appears
64
while (! nfs.exists(editsInfo)) {
65                     try {
66                         Thread.sleep(2000);
67                     } catch (InterruptedException JavaDoc ie) {
68                     }
69                 }
70
71                 // Read in edit-list info
72
DataInputStream in = new DataInputStream(nfs.open(editsInfo));
73                 try {
74                     in.read(); // version
75
this.numEdits += in.readInt(); // numEdits
76
} finally {
77                     in.close();
78                 }
79             }
80             sectionComplete = true;
81         }
82     }
83
84
85     /**
86      * Return how many edits there are in this section. This
87      * method requires total section-completion before executing.
88      */

89     public int numEdits() throws IOException {
90         sectionComplete();
91         return numEdits;
92     }
93
94     /**
95      * Merge all the components of the Section into a single file
96      * and return the location. This method requires total section-
97      * completion before executing.
98      */

99     public void mergeSectionComponents(File mergedEditsFile) throws IOException {
100         // Wait till all edit-contributors are done.
101
sectionComplete();
102
103         //
104
// Figure out the keyclass
105
//
106
File allEdits0 = new File("editsection." + readerNum, "editsdir." + 0);
107         File editsDir0 = new File(allEdits0, label);
108         File editsList0 = new File(editsDir0, "editslist");
109         while (! nfs.exists(editsList0)) {
110             try {
111                 Thread.sleep(2000);
112             } catch (InterruptedException JavaDoc ie) {
113             }
114         }
115
116         SequenceFile.Reader test = new SequenceFile.Reader(nfs, editsList0.getPath());
117         Class JavaDoc keyClass = null;
118         try {
119             keyClass = test.getKeyClass();
120         } finally {
121             test.close();
122         }
123
124         //
125
// Now write out contents of each contributor's file
126
//
127
try {
128             Writable key = (Writable) keyClass.newInstance();
129             SequenceFile.Writer out = new SequenceFile.Writer(nfs, mergedEditsFile.getPath(), keyClass, NullWritable.class);
130
131             try {
132                 for (int i = 0; i < totalMachines; i++) {
133                     File allEditsDir = new File("editsection." + readerNum, "editsdir." + i);
134                     File editsDir = new File(allEditsDir, label);
135                     File editsList = new File(editsDir, "editslist");
136                     while (! nfs.exists(editsList)) {
137                         try {
138                             Thread.sleep(2000);
139                         } catch (InterruptedException JavaDoc ie) {
140                         }
141                     }
142
143                     SequenceFile.Reader in = new SequenceFile.Reader(nfs, editsList.getPath());
144                     try {
145                         while (in.next(key)) {
146                             out.append(key, NullWritable.get());
147                         }
148                     } finally {
149                         in.close();
150                     }
151                 }
152             } finally {
153                 out.close();
154             }
155         } catch (InstantiationException JavaDoc ie) {
156             throw new IOException("Could not create instance of " + keyClass);
157         } catch (IllegalAccessException JavaDoc iae) {
158             throw new IOException("Could not create instance of " + keyClass);
159         }
160     }
161
162     /**
163      * Get rid of the edits encapsulated by this file.
164      */

165     public void delete() throws IOException {
166         for (int i = 0; i < totalMachines; i++) {
167             // Delete the files we're interested in
168
File editsDir = new File("editsection." + readerNum, "editsdir." + i);
169             File consumedEdits = new File(editsDir, label);
170             nfs.delete(consumedEdits);
171         }
172     }
173 }
174
Popular Tags