1 2 3 4 package net.nutch.db; 5 6 import java.io.*; 7 import java.util.*; 8 import java.util.logging.*; 9 10 import net.nutch.io.*; 11 import net.nutch.pagedb.*; 12 import net.nutch.linkdb.*; 13 14 35 class BucketSet { 36 final static String BUCKET_FILENAME = "bucket"; 37 final static String CONFIG_FILENAME = "config"; 38 final static int INTEGER_SIZE = 32; 39 40 45 public static BucketSet loadBuckets(File bucketsDir) throws IOException { 46 return new BucketSet(bucketsDir); 47 } 48 49 54 public static BucketSet createBuckets(File bucketsDir, int keyStartByte, int keyBits) throws IOException { 55 return new BucketSet(bucketsDir, keyStartByte, keyBits); 56 } 57 58 File bucketsDir; 60 int keyStartByte, keyBits, curBucket; 61 boolean insertsAllowed; 62 63 DataInputStream inStreams[]; 65 DataOutputStream outStreams[]; 66 int numBuckets; 67 boolean closed; 68 69 72 BucketSet(File bucketsDir) throws IOException { 73 if (! (bucketsDir.exists() && bucketsDir.isDirectory())) { 74 throw new IOException("File " + bucketsDir + " either does not exist or is not a directory"); 75 } 76 this.bucketsDir = bucketsDir; 77 loadConfig(); 78 79 if (insertsAllowed) { 80 this.outStreams = new DataOutputStream[numBuckets]; 81 } else { 82 this.inStreams = new DataInputStream[numBuckets]; 83 } 84 this.closed = false; 85 } 86 87 92 BucketSet(File bucketsDir, int keyStartByte, int keyBits) throws IOException { 93 if (keyBits > INTEGER_SIZE) { 94 throw new IOException("Parameter keyBits is too large: " + keyBits); 95 } 96 97 if (bucketsDir.exists()) { 98 throw new IOException("Directory " + bucketsDir + " is already present."); 99 } else { 100 bucketsDir.mkdir(); 101 } 102 103 this.bucketsDir = bucketsDir; 104 105 this.keyStartByte = keyStartByte; 107 this.keyBits = keyBits; 108 this.curBucket = 0; 109 this.insertsAllowed = true; 110 storeConfig(); 111 112 this.numBuckets = (int) Math.pow(2, keyBits); 114 if (insertsAllowed) { 115 this.outStreams = new DataOutputStream[numBuckets]; 116 } else { 117 this.inStreams = new DataInputStream[numBuckets]; 118 } 119 this.closed = false; 120 } 121 122 125 public void close() throws IOException { 126 if (closed) { 127 throw new IOException("BucketSet closed"); 128 } 129 for (int i = 0; i < numBuckets; i++) { 130 if (insertsAllowed) { 131 if (outStreams[i] != null) { 132 outStreams[i].close(); 133 } 134 } else { 135 if (inStreams[i] != null) { 136 inStreams[i].close(); 137 } 138 } 139 new File(bucketsDir, BUCKET_FILENAME + "." + i).delete(); 140 } 141 new File(bucketsDir, CONFIG_FILENAME).delete(); 142 bucketsDir.delete(); 143 144 inStreams = null; 145 outStreams = null; 146 closed = true; 147 } 148 149 155 public void storeItem(byte item[]) throws IOException { 156 if (closed) { 157 throw new IOException("BucketSet closed"); 158 } 159 if (! insertsAllowed) { 160 throw new IOException("Insert no longer allowed to this BucketSet"); 161 } 162 163 int bucket = computeBucket(item); 166 if (outStreams[bucket] == null) { 167 outStreams[bucket] = new DataOutputStream(new FileOutputStream(new File(bucketsDir, BUCKET_FILENAME + "." + bucket))); 168 } 169 170 outStreams[bucket].writeInt(item.length); 171 outStreams[bucket].write(item, 0, item.length); 172 } 173 174 181 public byte[] getNextItem() throws IOException { 182 186 if (closed) { 188 throw new IOException("BucketSet closed"); 189 } 190 if (insertsAllowed) { 192 for (int i = 0; i < outStreams.length; i++) { 194 if (outStreams[i] != null) { 195 outStreams[i].close(); 196 } 197 outStreams[i] = null; 198 } 199 200 inStreams = new DataInputStream[numBuckets]; 201 outStreams = null; 202 insertsAllowed = false; 203 curBucket = 0; 204 storeConfig(); 205 } 206 207 211 int i = 0, itemLen = -1; 213 for (i = curBucket; i < numBuckets; i++) { 214 if (inStreams[i] == null) { 216 File bucketFile = new File(bucketsDir, BUCKET_FILENAME + "." + i); 217 if (! bucketFile.exists()) { 218 continue; 219 } 220 inStreams[i] = new DataInputStream(new FileInputStream(bucketFile)); 221 } 222 223 if (inStreams[i].available() == 0) { 227 inStreams[i].close(); 228 inStreams[i] = null; 229 } else { 230 itemLen = inStreams[i].readInt(); 231 break; 232 } 233 } 234 235 239 curBucket = i; 241 242 if (itemLen >= 0) { 245 byte newItem[] = new byte[itemLen]; 246 inStreams[i].readFully(newItem); 247 return newItem; 248 } 249 250 return null; 252 } 253 254 258 int computeBucket(byte item[]) { 259 int bucketIndex = 0; 260 261 for (int i = 0; i < keyBits; i++) { 262 byte curByte = item[keyStartByte + (i / 8)]; 263 byte curBit = (byte) (0x01 & (curByte >> (7 - (i % 8)))); 264 265 bucketIndex = bucketIndex << 1; 266 bucketIndex |= curBit; 267 } 268 269 return bucketIndex; 270 } 271 272 275 void loadConfig() throws IOException { 276 File configFile = new File(bucketsDir, CONFIG_FILENAME); 277 DataInputStream dis = new DataInputStream(new FileInputStream(configFile)); 278 try { 279 this.keyStartByte = dis.readInt(); 280 this.keyBits = dis.readInt(); 281 this.curBucket = dis.readInt(); 282 this.insertsAllowed = dis.readBoolean(); 283 } finally { 284 dis.close(); 285 } 286 } 287 288 291 void storeConfig() throws IOException { 292 File configFile = new File(bucketsDir, CONFIG_FILENAME); 293 DataOutputStream dos = new DataOutputStream(new FileOutputStream(configFile)); 294 try { 295 dos.writeInt(keyStartByte); 296 dos.writeInt(keyBits); 297 dos.writeInt(curBucket); 298 dos.writeBoolean(insertsAllowed); 299 } finally { 300 dos.close(); 301 } 302 } 303 } 304 305 | Popular Tags |