KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > indexer > DeleteDuplicates


1 /* Copyright (c) 2003-2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.indexer;
5
6 import java.io.*;
7 import java.security.*;
8 import java.text.*;
9 import java.util.*;
10 import java.util.logging.*;
11
12 import net.nutch.io.*;
13 import net.nutch.fs.*;
14 import net.nutch.util.*;
15
16 import org.apache.lucene.index.IndexReader;
17 import org.apache.lucene.document.Document;
18
19 /******************************************************************
20  * Deletes duplicate documents in a set of Lucene indexes.
21  * Duplicates have either the same contents (via MD5 hash) or the same URL.
22  *
23  * @author Doug Cutting
24  * @author Mike Cafarella
25  ******************************************************************/

26 public class DeleteDuplicates {
27   private static final Logger LOG =
28     LogFormatter.getLogger("net.nutch.indexer.DeleteDuplicates");
29
30   /********************************************************
31    * The key used in sorting for duplicates.
32    *******************************************************/

33   public static class IndexedDoc implements WritableComparable {
34     private MD5Hash hash = new MD5Hash();
35     private float score;
36     private int index; // the segment index
37
private int doc; // within the index
38
private int urlLen;
39
40     public void write(DataOutput out) throws IOException {
41       hash.write(out);
42       out.writeFloat(score);
43       out.writeInt(index);
44       out.writeInt(doc);
45       out.writeInt(urlLen);
46     }
47
48     public void readFields(DataInput in) throws IOException {
49       hash.readFields(in);
50       this.score = in.readFloat();
51       this.index = in.readInt();
52       this.doc = in.readInt();
53       this.urlLen = in.readInt();
54     }
55
56     public int compareTo(Object JavaDoc o) {
57       throw new RuntimeException JavaDoc("this is never used");
58     }
59
60     /**
61      * Order equal hashes by decreasing score and increasing urlLen.
62      */

63     public static class ByHashScore extends WritableComparator {
64       public ByHashScore() { super(IndexedDoc.class); }
65       
66       public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
67         int c = compareBytes(b1, s1, MD5Hash.MD5_LEN, b2, s2, MD5Hash.MD5_LEN);
68         if (c != 0)
69           return c;
70
71         float thisScore = readFloat(b1, s1+MD5Hash.MD5_LEN);
72         float thatScore = readFloat(b2, s2+MD5Hash.MD5_LEN);
73
74         if (thisScore < thatScore)
75           return 1;
76         else if (thisScore > thatScore)
77           return -1;
78         
79         int thisUrlLen = readInt(b1, s1+MD5Hash.MD5_LEN+12);
80         int thatUrlLen = readInt(b2, s2+MD5Hash.MD5_LEN+12);
81
82         return thisUrlLen - thatUrlLen;
83       }
84     }
85
86     /**
87      * Order equal hashes by decreasing index and document.
88      */

89     public static class ByHashDoc extends WritableComparator {
90       public ByHashDoc() { super(IndexedDoc.class); }
91       
92       public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
93         int c = compareBytes(b1, s1, MD5Hash.MD5_LEN, b2, s2, MD5Hash.MD5_LEN);
94         if (c != 0)
95           return c;
96
97         int thisIndex = readInt(b1, s1+MD5Hash.MD5_LEN+4);
98         int thatIndex = readInt(b2, s2+MD5Hash.MD5_LEN+4);
99
100         if (thisIndex != thatIndex)
101           return thatIndex - thisIndex;
102
103         int thisDoc = readInt(b1, s1+MD5Hash.MD5_LEN+8);
104         int thatDoc = readInt(b2, s2+MD5Hash.MD5_LEN+8);
105
106         return thatDoc - thisDoc;
107       }
108     }
109   }
110
111   /*****************************************************
112    ****************************************************/

113   private interface Hasher {
114     void updateHash(MD5Hash hash, Document doc);
115   }
116
117   //////////////////////////////////////////////////////
118
// DeleteDuplicates class
119
//////////////////////////////////////////////////////
120
private IndexReader[] readers;
121   private File tempFile;
122
123   /**
124    * Constructs a duplicate detector for the provided indexes.
125    */

126   public DeleteDuplicates(IndexReader[] readers, File workingDir) throws IOException {
127     this.readers = readers;
128     this.tempFile = new File(workingDir, "ddup-" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis())));
129   }
130
131   /**
132    * Closes the indexes, saving changes.
133    */

134   public void close() throws IOException {
135     for (int i = 0; i < readers.length; i++) {
136         readers[i].close();
137     }
138     tempFile.delete();
139   }
140
141   /**
142    * Delete pages with duplicate content hashes. Of those with the same
143    * content hash, keep the page with the highest score.
144    */

145   public void deleteContentDuplicates() throws IOException {
146     LOG.info("Reading content hashes...");
147     computeHashes(new Hasher() {
148         public void updateHash(MD5Hash hash, Document doc) {
149           hash.setDigest(doc.get("digest"));
150         }
151       });
152
153     LOG.info("Sorting content hashes...");
154     SequenceFile.Sorter byHashScoreSorter =
155       new SequenceFile.Sorter(new LocalFileSystem(), new IndexedDoc.ByHashScore(),NullWritable.class);
156     byHashScoreSorter.sort(tempFile.getPath(), tempFile.getPath() + ".sorted");
157     
158     LOG.info("Deleting content duplicates...");
159     int duplicateCount = deleteDuplicates();
160     LOG.info("Deleted " + duplicateCount + " content duplicates.");
161   }
162
163   /**
164    * Delete pages with duplicate URLs. Of those with the same
165    * URL, keep the most recently fetched page.
166    */

167   public void deleteUrlDuplicates() throws IOException {
168     final MessageDigest digest;
169     try {
170       digest = MessageDigest.getInstance("MD5");
171     } catch (Exception JavaDoc e) {
172       throw new RuntimeException JavaDoc(e.toString());
173     }
174
175     LOG.info("Reading url hashes...");
176     computeHashes(new Hasher() {
177         public void updateHash(MD5Hash hash, Document doc) {
178           try {
179             digest.update(UTF8.getBytes(doc.get("url")));
180             digest.digest(hash.getDigest(), 0, MD5Hash.MD5_LEN);
181           } catch (Exception JavaDoc e) {
182             throw new RuntimeException JavaDoc(e.toString());
183           }
184         }
185       });
186
187     LOG.info("Sorting url hashes...");
188     SequenceFile.Sorter byHashDocSorter =
189       new SequenceFile.Sorter(new LocalFileSystem(), new IndexedDoc.ByHashDoc(), NullWritable.class);
190     byHashDocSorter.sort(tempFile.getPath(), tempFile.getPath() + ".sorted");
191     
192     LOG.info("Deleting url duplicates...");
193     int duplicateCount = deleteDuplicates();
194     LOG.info("Deleted " + duplicateCount + " url duplicates.");
195   }
196
197   /**
198    * Compute hashes over all the input indices
199    */

200   private void computeHashes(Hasher hasher) throws IOException {
201     IndexedDoc indexedDoc = new IndexedDoc();
202
203     SequenceFile.Writer writer =
204       new SequenceFile.Writer(new LocalFileSystem(), tempFile.getPath(), IndexedDoc.class, NullWritable.class);
205     try {
206       for (int index = 0; index < readers.length; index++) {
207         IndexReader reader = readers[index];
208         int readerMax = reader.maxDoc();
209         indexedDoc.index = index;
210         for (int doc = 0; doc < readerMax; doc++) {
211           if (!reader.isDeleted(doc)) {
212             Document document = reader.document(doc);
213             hasher.updateHash(indexedDoc.hash, document);
214             indexedDoc.score = Float.parseFloat(document.get("boost"));
215             indexedDoc.doc = doc;
216             indexedDoc.urlLen = document.get("url").length();
217             writer.append(indexedDoc, NullWritable.get());
218           }
219         }
220       }
221     } finally {
222       writer.close();
223     }
224   }
225
226   /**
227    * Actually remove the duplicates from the indices
228    */

229   private int deleteDuplicates() throws IOException {
230       if (tempFile.exists()) {
231           tempFile.delete();
232       }
233       if (!new File(tempFile.getPath() + ".sorted").renameTo(tempFile)) {
234           throw new IOException("Couldn't rename!");
235       }
236
237       IndexedDoc indexedDoc = new IndexedDoc();
238       SequenceFile.Reader reader = new SequenceFile.Reader(new LocalFileSystem(), tempFile.getPath());
239       try {
240           int duplicateCount = 0;
241           MD5Hash prevHash = null; // previous hash
242
while (reader.next(indexedDoc, NullWritable.get())) {
243               if (prevHash == null) { // initialize prevHash
244
prevHash = new MD5Hash();
245                   prevHash.set(indexedDoc.hash);
246                   continue;
247               }
248               if (indexedDoc.hash.equals(prevHash)) { // found a duplicate
249
readers[indexedDoc.index].delete(indexedDoc.doc); // delete it
250
duplicateCount++;
251               } else {
252                   prevHash.set(indexedDoc.hash); // reset prevHash
253
}
254           }
255           return duplicateCount;
256       } finally {
257           reader.close();
258           tempFile.delete();
259       }
260   }
261
262   /**
263    * Delete duplicates in the indexes in the named directory.
264    */

265   public static void main(String JavaDoc[] args) throws Exception JavaDoc {
266     //
267
// Usage, arg checking
268
//
269
String JavaDoc usage = "DeleteDuplicates (-local | -ndfs <namenode:port>) [-workingdir <workingdir>] <segmentsDir>";
270     if (args.length < 2) {
271       System.err.println("Usage: " + usage);
272       return;
273     }
274
275     NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0);
276     File workingDir = new File(new File("").getCanonicalPath());
277     try {
278         //
279
// Build an array of IndexReaders for all the segments we want to process
280
//
281
int j = 0;
282         if ("-workingdir".equals(args[j])) {
283             j++;
284             workingDir = new File(new File(args[j++]).getCanonicalPath());
285         }
286         workingDir = new File(workingDir, "ddup-workingdir");
287
288         String JavaDoc segmentsDir = args[j++];
289         File[] directories = nfs.listFiles(new File(segmentsDir));
290         Vector vReaders = new Vector();
291         Vector putbackList = new Vector();
292         int maxDoc = 0;
293
294         for (int i = 0; i < directories.length; i++) {
295             //
296
// Make sure the index has been completed
297
//
298
File indexDone = new File(directories[i], IndexSegment.DONE_NAME);
299             if (nfs.exists(indexDone) && nfs.isFile(indexDone)) {
300                 //
301
// Make sure the specified segment can be processed locally
302
//
303
File indexDir = new File(directories[i], "index");
304                 File tmpDir = new File(workingDir, "ddup-" + new SimpleDateFormat("yyyMMddHHmmss").format(new Date(System.currentTimeMillis())));
305                 File localIndexDir = nfs.startLocalOutput(indexDir, tmpDir);
306
307                 putbackList.add(indexDir);
308                 putbackList.add(tmpDir);
309
310                 //
311
// Construct the reader
312
//
313
IndexReader reader = IndexReader.open(localIndexDir);
314                 if (reader.hasDeletions()) {
315                     LOG.info("Clearing old deletions in " + indexDir + "(" + localIndexDir + ")");
316                     reader.undeleteAll();
317                 }
318                 maxDoc += reader.maxDoc();
319                 vReaders.add(reader);
320             }
321         }
322
323         //
324
// Now build the DeleteDuplicates object, and complete
325
//
326
IndexReader[] readers = new IndexReader[vReaders.size()];
327         for(int i = 0; vReaders.size()>0; i++) {
328             readers[i] = (IndexReader)vReaders.remove(0);
329         }
330
331         if (workingDir.exists()) {
332             FileUtil.fullyDelete(workingDir);
333         }
334         workingDir.mkdirs();
335         DeleteDuplicates dd = new DeleteDuplicates(readers, workingDir);
336         dd.deleteUrlDuplicates();
337         dd.deleteContentDuplicates();
338         dd.close();
339
340         //
341
// Dups have been deleted. Now make sure they are placed back to NFS
342
//
343
LOG.info("Duplicate deletion complete locally. Now returning to NFS...");
344         for (Iterator it = putbackList.iterator(); it.hasNext(); ) {
345             File indexDir = (File) it.next();
346             File tmpDir = (File) it.next();
347             nfs.completeLocalOutput(indexDir, tmpDir);
348         }
349         LOG.info("DeleteDuplicates complete");
350         FileUtil.fullyDelete(workingDir);
351     } finally {
352         nfs.close();
353     }
354   }
355 }
356
Popular Tags