1 2 3 package net.nutch.db; 4 5 import java.io.*; 6 import java.util.*; 7 8 import net.nutch.io.*; 9 import net.nutch.fs.*; 10 import net.nutch.util.*; 11 12 19 public class EditSectionGroupWriter { 20 final static int CUR_VERSION = 0; 21 22 public final static String GROUP_METAINFO = "group_metainfo"; 24 25 public static int URL_KEYSPACE = 0; 29 public static int MD5_KEYSPACE = 1; 30 31 38 public static abstract class KeyExtractor { 39 41 public KeyExtractor() { 42 } 43 44 46 public abstract WritableComparable extractInnerKey(WritableComparable key); 47 } 48 49 52 public static class PageURLExtractor extends KeyExtractor { 53 public PageURLExtractor() { 54 } 55 public WritableComparable extractInnerKey(WritableComparable key) { 56 return ((DistributedWebDBWriter.PageInstruction) key).getPage().getURL(); 57 } 58 } 59 60 63 public static class PageMD5Extractor extends KeyExtractor { 64 public PageMD5Extractor() { 65 } 66 public WritableComparable extractInnerKey(WritableComparable key) { 67 return ((DistributedWebDBWriter.PageInstruction) key).getPage().getMD5(); 68 } 69 } 70 71 74 public static class LinkURLExtractor extends KeyExtractor { 75 public LinkURLExtractor() { 76 } 77 public WritableComparable extractInnerKey(WritableComparable key) { 78 return ((DistributedWebDBWriter.LinkInstruction) key).getLink().getURL(); 79 } 80 } 81 82 85 public static class LinkMD5Extractor extends KeyExtractor { 86 public LinkMD5Extractor() { 87 } 88 public WritableComparable extractInnerKey(WritableComparable key) { 89 return ((DistributedWebDBWriter.LinkInstruction) key).getLink().getFromID(); 90 } 91 } 92 93 97 public static void createEditGroup(NutchFileSystem nfs, File dbDir, String label, int numSections, int keySpaceType) throws IOException { 98 if (numSections > DBKeyDivision.MAX_SECTIONS) { 100 throw new IllegalArgumentException ("Maximum number of sections is " + DBKeyDivision.MAX_SECTIONS); 101 } 102 103 if ((keySpaceType != URL_KEYSPACE) && 105 (keySpaceType != MD5_KEYSPACE)) { 106 throw new IllegalArgumentException ("Unknown keyspace type: " + keySpaceType); 107 } 108 109 File metaInfoDir = new File(new File(dbDir, "standard"), GROUP_METAINFO); 110 File metaInfo = new File(metaInfoDir, label); 111 DataOutputStream out = new DataOutputStream(nfs.create(metaInfo)); 112 try { 113 out.write(CUR_VERSION); 114 out.writeInt(keySpaceType); 115 116 double stepSize = DBKeyDivision.MAX_SECTIONS / (1.0 * numSections); 117 if (keySpaceType == URL_KEYSPACE) { 118 UTF8 url = new UTF8(); 119 for (int i = 0; i < numSections; i++) { 120 url.set(DBKeyDivision.URL_KEYSPACE_DIVIDERS[(int) Math.round(i * stepSize)]); 121 url.write(out); 122 } 123 } else { 124 for (int i = 0; i < numSections; i++) { 125 DBKeyDivision.MD5_KEYSPACE_DIVIDERS[(int) Math.round(i * stepSize)].write(out); 126 } 127 } 128 } finally { 129 out.close(); 130 } 131 } 132 133 134 int machineNum = -1, totalMachines = 1; 135 KeyExtractor extractor; 136 String label; 137 WritableComparable sectionKeys[]; 138 EditSectionWriter sectionWriters[]; 139 140 150 public EditSectionGroupWriter(NutchFileSystem nfs, int machineNum, int totalMachines, String label, Class keyClass, Class valClass, EditSectionGroupWriter.KeyExtractor extractor) throws IOException { 151 this.machineNum = machineNum; 152 this.totalMachines = totalMachines; 153 this.extractor = extractor; 154 155 if (machineNum < 0 || machineNum >= totalMachines) { 157 throw new IllegalArgumentException ("machineNum is " + machineNum + ", and totalMachines is " + totalMachines); 158 } 159 160 File metaInfoDir = new File("standard", GROUP_METAINFO); 162 File metaInfo = new File(metaInfoDir, label); 163 DataInputStream in = new DataInputStream(nfs.open(metaInfo)); 164 try { 165 int version = in.read(); 166 int keySpaceType = in.readInt(); 167 168 this.sectionKeys = new WritableComparable[totalMachines]; 169 for (int i = 0; i < sectionKeys.length; i++) { 170 WritableComparable key = null; 171 if (keySpaceType == URL_KEYSPACE) { 172 key = new UTF8(); 173 } else { 174 key = new MD5Hash(); 175 } 176 key.readFields(in); 177 this.sectionKeys[i] = key; 178 } 179 } finally { 180 in.close(); 181 } 182 183 this.sectionWriters = new EditSectionWriter[totalMachines]; 185 for (int i = 0; i < sectionWriters.length; i++) { 186 this.sectionWriters[i] = new EditSectionWriter(nfs, label, i, machineNum, keyClass, valClass); 187 } 188 this.label = label; 189 } 190 191 195 public void append(WritableComparable key, Writable val) throws IOException { 196 WritableComparable innerKey = extractor.extractInnerKey(key); 197 198 if (sectionWriters.length == 0) { 200 return; 201 } 202 203 int start = 0, end = sectionWriters.length, pivot = 0; 205 206 while (end - start > 1) { 207 pivot = (end + start) / 2; 208 int comparison = innerKey.compareTo(sectionKeys[pivot]); 209 210 if (comparison < 0) { 211 end = pivot; 212 } else if (comparison >= 0) { 213 start = pivot; 214 } 215 } 216 sectionWriters[start].append(key, val); 217 } 218 219 222 public void close() throws IOException { 223 for (int i = 0; i < sectionWriters.length; i++) { 224 sectionWriters[i].close(); 225 } 226 } 227 } 228 229 | Popular Tags |