KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > db > BucketSet


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.db;
5
6 import java.io.*;
7 import java.util.*;
8 import java.util.logging.*;
9
10 import net.nutch.io.*;
11 import net.nutch.pagedb.*;
12 import net.nutch.linkdb.*;
13
14 /********************************************
15  * A BucketSet holds many buckets, full of instructions.
16  *
17  * Once created, it can be given a set of byte arrays. They
18  * will be divided into a handy number of buckets. The BucketSet
19  * will look at a certain byte range within the item to decide
20  * which bucket will get the item. Ideally, your input data
21  * will show an even distribution in the byte-range that you
22  * indicate.
23  *
24  * Once you call close(), the BucketSet cannot be read from
25  * or written to. You must reopen it by allocating a new BucketSet.
26  *
27  * Once you request a single item via getNextItem(), you cannot
28  * again add items to the BucketSet. You can start reading from
29  * the beginning of the BucketSet again by recreating it.
30  *
31  * If you are completely done with the BucketSet, delete() it.
32  *
33  * @author Mike Cafarella
34  ********************************************/

35 class BucketSet {
36     final static String JavaDoc BUCKET_FILENAME = "bucket";
37     final static String JavaDoc CONFIG_FILENAME = "config";
38     final static int INTEGER_SIZE = 32;
39     
40     /**
41      * Create a new BucketSet from a data set that is already there.
42      * You can start adding to the BucketSet again, as long as
43      * you have not yet made a call to getNextItem().
44      */

45     public static BucketSet loadBuckets(File bucketsDir) throws IOException {
46         return new BucketSet(bucketsDir);
47     }
48
49     /**
50      * Create a brand-new BucketSet, at the given File location,
51      * with the appropriate parameters. If the file already exists,
52      * then this will fail.
53      */

54     public static BucketSet createBuckets(File bucketsDir, int keyStartByte, int keyBits) throws IOException {
55         return new BucketSet(bucketsDir, keyStartByte, keyBits);
56     }
57
58     // Persistent values
59
File bucketsDir;
60     int keyStartByte, keyBits, curBucket;
61     boolean insertsAllowed;
62
63     // Transient values
64
DataInputStream inStreams[];
65     DataOutputStream outStreams[];
66     int numBuckets;
67     boolean closed;
68
69     /**
70      * Load an old BucketSet, at bucketsDir.
71      */

72     BucketSet(File bucketsDir) throws IOException {
73         if (! (bucketsDir.exists() && bucketsDir.isDirectory())) {
74             throw new IOException("File " + bucketsDir + " either does not exist or is not a directory");
75         }
76         this.bucketsDir = bucketsDir;
77         loadConfig();
78
79         if (insertsAllowed) {
80             this.outStreams = new DataOutputStream[numBuckets];
81         } else {
82             this.inStreams = new DataInputStream[numBuckets];
83         }
84         this.closed = false;
85     }
86
87     /**
88      * Create a new BucketSet at bucketsDir, with the given parameters.
89      * The number of buckets is determined by how many bits of the
90      * key you allocate toward bucket-selection.
91      */

92     BucketSet(File bucketsDir, int keyStartByte, int keyBits) throws IOException {
93         if (keyBits > INTEGER_SIZE) {
94             throw new IOException("Parameter keyBits is too large: " + keyBits);
95         }
96
97         if (bucketsDir.exists()) {
98             throw new IOException("Directory " + bucketsDir + " is already present.");
99         } else {
100             bucketsDir.mkdir();
101         }
102
103         this.bucketsDir = bucketsDir;
104
105         // Persistent values
106
this.keyStartByte = keyStartByte;
107         this.keyBits = keyBits;
108         this.curBucket = 0;
109         this.insertsAllowed = true;
110         storeConfig();
111
112         // Transient ones
113
this.numBuckets = (int) Math.pow(2, keyBits);
114         if (insertsAllowed) {
115             this.outStreams = new DataOutputStream[numBuckets];
116         } else {
117             this.inStreams = new DataInputStream[numBuckets];
118         }
119         this.closed = false;
120     }
121
122     /**
123      * Close down the BucketSet. No more reading or writing.
124      */

125     public void close() throws IOException {
126         if (closed) {
127             throw new IOException("BucketSet closed");
128         }
129         for (int i = 0; i < numBuckets; i++) {
130             if (insertsAllowed) {
131                 if (outStreams[i] != null) {
132                     outStreams[i].close();
133                 }
134             } else {
135                 if (inStreams[i] != null) {
136                     inStreams[i].close();
137                 }
138             }
139             new File(bucketsDir, BUCKET_FILENAME + "." + i).delete();
140         }
141         new File(bucketsDir, CONFIG_FILENAME).delete();
142         bucketsDir.delete();
143
144         inStreams = null;
145         outStreams = null;
146         closed = true;
147     }
148
149     /**
150      * Write the given byte array to the BucketSet.
151      *
152      * Use the array's key region to decide which bucket
153      * will get it.
154      */

155     public void storeItem(byte item[]) throws IOException {
156         if (closed) {
157             throw new IOException("BucketSet closed");
158         }
159         if (! insertsAllowed) {
160             throw new IOException("Insert no longer allowed to this BucketSet");
161         }
162
163         // Before we can store the item, we first need to
164
// compute which bucket to use
165
int bucket = computeBucket(item);
166         if (outStreams[bucket] == null) {
167             outStreams[bucket] = new DataOutputStream(new FileOutputStream(new File(bucketsDir, BUCKET_FILENAME + "." + bucket)));
168         }
169         
170         outStreams[bucket].writeInt(item.length);
171         outStreams[bucket].write(item, 0, item.length);
172     }
173
174     /**
175      * Return the next item in the current bucket.
176      *
177      * If we're at the end of a bucket, jump silently to the next.
178      *
179      * If we're at the end of all buckets, return null.
180      */

181     public byte[] getNextItem() throws IOException {
182         //
183
// Phase 0. Check to make sure it's OK.
184
//
185

186         // Make sure we're not closed
187
if (closed) {
188             throw new IOException("BucketSet closed");
189         }
190         // If this is the first call to getNextItem(), we need to prepare!
191
if (insertsAllowed) {
192             // Close down all outstreams, open instreams
193
for (int i = 0; i < outStreams.length; i++) {
194                 if (outStreams[i] != null) {
195                     outStreams[i].close();
196                 }
197                 outStreams[i] = null;
198             }
199
200             inStreams = new DataInputStream[numBuckets];
201             outStreams = null;
202             insertsAllowed = false;
203             curBucket = 0;
204             storeConfig();
205         }
206
207         //
208
// Phase 1. Find the right bucket
209
//
210

211         // Move through all buckets till we find something to read
212
int i = 0, itemLen = -1;
213         for (i = curBucket; i < numBuckets; i++) {
214             // First, open stream if necessary
215
if (inStreams[i] == null) {
216                 File bucketFile = new File(bucketsDir, BUCKET_FILENAME + "." + i);
217                 if (! bucketFile.exists()) {
218                     continue;
219                 }
220                 inStreams[i] = new DataInputStream(new FileInputStream(bucketFile));
221             }
222
223             // Second, read from stream how many bytes are in item.
224
// If we hit the end of the stream, we continue to the
225
// next one.
226
if (inStreams[i].available() == 0) {
227                 inStreams[i].close();
228                 inStreams[i] = null;
229             } else {
230                 itemLen = inStreams[i].readInt();
231                 break;
232             }
233         }
234
235         //
236
// Phase 2. Remember the bucket, and read the next item
237
//
238

239         // Remember where we stopped
240
curBucket = i;
241
242         // Check to see if we found an item, or if we have hit
243
// the end of the bucket set.
244
if (itemLen >= 0) {
245             byte newItem[] = new byte[itemLen];
246             inStreams[i].readFully(newItem);
247             return newItem;
248         }
249         
250         // We have no more buckets!
251
return null;
252     }
253
254     /**
255      * Compute which bucket to use, given the input data
256      * (and configuration params). Return bucket index.
257      */

258     int computeBucket(byte item[]) {
259         int bucketIndex = 0;
260
261         for (int i = 0; i < keyBits; i++) {
262             byte curByte = item[keyStartByte + (i / 8)];
263             byte curBit = (byte) (0x01 & (curByte >> (7 - (i % 8))));
264
265             bucketIndex = bucketIndex << 1;
266             bucketIndex |= curBit;
267         }
268
269         return bucketIndex;
270     }
271
272     /**
273      * Load the BucketSet config file
274      */

275     void loadConfig() throws IOException {
276         File configFile = new File(bucketsDir, CONFIG_FILENAME);
277         DataInputStream dis = new DataInputStream(new FileInputStream(configFile));
278         try {
279             this.keyStartByte = dis.readInt();
280             this.keyBits = dis.readInt();
281             this.curBucket = dis.readInt();
282             this.insertsAllowed = dis.readBoolean();
283         } finally {
284             dis.close();
285         }
286     }
287
288     /**
289      * Store values out to the BucketSet config file
290      */

291     void storeConfig() throws IOException {
292         File configFile = new File(bucketsDir, CONFIG_FILENAME);
293         DataOutputStream dos = new DataOutputStream(new FileOutputStream(configFile));
294         try {
295             dos.writeInt(keyStartByte);
296             dos.writeInt(keyBits);
297             dos.writeInt(curBucket);
298             dos.writeBoolean(insertsAllowed);
299         } finally {
300             dos.close();
301         }
302     }
303 }
304
305
Popular Tags