KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > db > EditSectionGroupWriter


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3 package net.nutch.db;
4
5 import java.io.*;
6 import java.util.*;
7
8 import net.nutch.io.*;
9 import net.nutch.fs.*;
10 import net.nutch.util.*;
11
12 /*********************************************************
13  * The EditSectionGroupWriter maintains a set of EditSectionWriter
14  * objects. It chooses the appropriate EditSectionWriter to
15  * carry out each operation.
16  *
17  * @author Mike Cafarella
18  *********************************************************/

19 public class EditSectionGroupWriter {
20     final static int CUR_VERSION = 0;
21
22     // File for SectionGroupWriter meta-info
23
public final static String JavaDoc GROUP_METAINFO = "group_metainfo";
24     
25     //
26
// Keyspace identifiers
27
//
28
public static int URL_KEYSPACE = 0;
29     public static int MD5_KEYSPACE = 1;
30
31     /*********************************************************
32      * Edit instructions are Comparable, but they also have
33      * an "inner" key like MD5Hash or URL that is also Comparable.
34      * This class extracts that inner key, which we need for
35      * allocating a Page or Link Instruction into the correct
36      * bucket.
37      *********************************************************/

38     public static abstract class KeyExtractor {
39         /**
40          */

41         public KeyExtractor() {
42         }
43
44         /**
45          */

46         public abstract WritableComparable extractInnerKey(WritableComparable key);
47     }
48
49     /**
50      * Get the URL from a PageInstruction
51      */

52     public static class PageURLExtractor extends KeyExtractor {
53         public PageURLExtractor() {
54         }
55         public WritableComparable extractInnerKey(WritableComparable key) {
56             return ((DistributedWebDBWriter.PageInstruction) key).getPage().getURL();
57         }
58     }
59
60     /**
61      * Get the MD5 from a PageInstruction
62      */

63     public static class PageMD5Extractor extends KeyExtractor {
64         public PageMD5Extractor() {
65         }
66         public WritableComparable extractInnerKey(WritableComparable key) {
67             return ((DistributedWebDBWriter.PageInstruction) key).getPage().getMD5();
68         }
69     }
70
71     /**
72      * Get the URL from a LinkInstruction
73      */

74     public static class LinkURLExtractor extends KeyExtractor {
75         public LinkURLExtractor() {
76         }
77         public WritableComparable extractInnerKey(WritableComparable key) {
78             return ((DistributedWebDBWriter.LinkInstruction) key).getLink().getURL();
79         }
80     }
81
82     /**
83      * Get the MD5 from a LinkInstruction
84      */

85     public static class LinkMD5Extractor extends KeyExtractor {
86         public LinkMD5Extractor() {
87         }
88         public WritableComparable extractInnerKey(WritableComparable key) {
89             return ((DistributedWebDBWriter.LinkInstruction) key).getLink().getFromID();
90         }
91     }
92
93     /**
94      * Initialize an EditSectionGroup. Tell it the label, the
95      * keytype, and the division between keys.
96      */

97     public static void createEditGroup(NutchFileSystem nfs, File dbDir, String JavaDoc label, int numSections, int keySpaceType) throws IOException {
98         // Max num-sections
99
if (numSections > DBKeyDivision.MAX_SECTIONS) {
100             throw new IllegalArgumentException JavaDoc("Maximum number of sections is " + DBKeyDivision.MAX_SECTIONS);
101         }
102
103         // Test for known keyspace type.
104
if ((keySpaceType != URL_KEYSPACE) &&
105             (keySpaceType != MD5_KEYSPACE)) {
106             throw new IllegalArgumentException JavaDoc("Unknown keyspace type: " + keySpaceType);
107         }
108
109         File metaInfoDir = new File(new File(dbDir, "standard"), GROUP_METAINFO);
110         File metaInfo = new File(metaInfoDir, label);
111         DataOutputStream out = new DataOutputStream(nfs.create(metaInfo));
112         try {
113             out.write(CUR_VERSION);
114             out.writeInt(keySpaceType);
115
116             double stepSize = DBKeyDivision.MAX_SECTIONS / (1.0 * numSections);
117             if (keySpaceType == URL_KEYSPACE) {
118                 UTF8 url = new UTF8();
119                 for (int i = 0; i < numSections; i++) {
120                     url.set(DBKeyDivision.URL_KEYSPACE_DIVIDERS[(int) Math.round(i * stepSize)]);
121                     url.write(out);
122                 }
123             } else {
124                 for (int i = 0; i < numSections; i++) {
125                     DBKeyDivision.MD5_KEYSPACE_DIVIDERS[(int) Math.round(i * stepSize)].write(out);
126                 }
127             }
128         } finally {
129             out.close();
130         }
131     }
132
133
134     int machineNum = -1, totalMachines = 1;
135     KeyExtractor extractor;
136     String JavaDoc label;
137     WritableComparable sectionKeys[];
138     EditSectionWriter sectionWriters[];
139
140     /**
141      * Start a EditSectionGroupWriter at the indicated location, for
142      * a single emitter. There will be as many of these as there
143      * are processor-machines. (The emitter value will be different
144      * for each.) The Group must already have been created via a
145      * call to EditSectionGroupWriter.createEditSectionGroupWriter().
146      *
147      * The EditSectionGroupWriter consists of a bunch of
148      * EditSectionWriters, each of which hold a file we append to.
149      */

150     public EditSectionGroupWriter(NutchFileSystem nfs, int machineNum, int totalMachines, String JavaDoc label, Class JavaDoc keyClass, Class JavaDoc valClass, EditSectionGroupWriter.KeyExtractor extractor) throws IOException {
151         this.machineNum = machineNum;
152         this.totalMachines = totalMachines;
153         this.extractor = extractor;
154
155         // Bail if the emitter/section numbering is incorrect
156
if (machineNum < 0 || machineNum >= totalMachines) {
157             throw new IllegalArgumentException JavaDoc("machineNum is " + machineNum + ", and totalMachines is " + totalMachines);
158         }
159
160         // Load in details about keys
161
File metaInfoDir = new File("standard", GROUP_METAINFO);
162         File metaInfo = new File(metaInfoDir, label);
163         DataInputStream in = new DataInputStream(nfs.open(metaInfo));
164         try {
165             int version = in.read();
166             int keySpaceType = in.readInt();
167
168             this.sectionKeys = new WritableComparable[totalMachines];
169             for (int i = 0; i < sectionKeys.length; i++) {
170                 WritableComparable key = null;
171                 if (keySpaceType == URL_KEYSPACE) {
172                     key = new UTF8();
173                 } else {
174                     key = new MD5Hash();
175                 }
176                 key.readFields(in);
177                 this.sectionKeys[i] = key;
178             }
179         } finally {
180             in.close();
181         }
182
183         // Build all the sections
184
this.sectionWriters = new EditSectionWriter[totalMachines];
185         for (int i = 0; i < sectionWriters.length; i++) {
186             this.sectionWriters[i] = new EditSectionWriter(nfs, label, i, machineNum, keyClass, valClass);
187         }
188         this.label = label;
189     }
190
191     /**
192      * Add an instruction and append it. We need to find an
193      * appropriate EditSectionWriter.
194      */

195     public void append(WritableComparable key, Writable val) throws IOException {
196         WritableComparable innerKey = extractor.extractInnerKey(key);
197
198         // If there is noplace to write the item, return
199
if (sectionWriters.length == 0) {
200             return;
201         }
202
203         // (start, end] is the range
204
int start = 0, end = sectionWriters.length, pivot = 0;
205
206         while (end - start > 1) {
207             pivot = (end + start) / 2;
208             int comparison = innerKey.compareTo(sectionKeys[pivot]);
209
210             if (comparison < 0) {
211                 end = pivot;
212             } else if (comparison >= 0) {
213                 start = pivot;
214             }
215         }
216         sectionWriters[start].append(key, val);
217     }
218
219     /**
220      * Close down the writers
221      */

222     public void close() throws IOException {
223         for (int i = 0; i < sectionWriters.length; i++) {
224             sectionWriters[i].close();
225         }
226     }
227 }
228
229
Popular Tags