1 2 3 4 package net.nutch.indexer; 5 6 import java.io.*; 7 import java.security.*; 8 import java.text.*; 9 import java.util.*; 10 import java.util.logging.*; 11 12 import net.nutch.io.*; 13 import net.nutch.fs.*; 14 import net.nutch.util.*; 15 16 import org.apache.lucene.index.IndexReader; 17 import org.apache.lucene.document.Document; 18 19 26 public class DeleteDuplicates { 27 private static final Logger LOG = 28 LogFormatter.getLogger("net.nutch.indexer.DeleteDuplicates"); 29 30 33 public static class IndexedDoc implements WritableComparable { 34 private MD5Hash hash = new MD5Hash(); 35 private float score; 36 private int index; private int doc; private int urlLen; 39 40 public void write(DataOutput out) throws IOException { 41 hash.write(out); 42 out.writeFloat(score); 43 out.writeInt(index); 44 out.writeInt(doc); 45 out.writeInt(urlLen); 46 } 47 48 public void readFields(DataInput in) throws IOException { 49 hash.readFields(in); 50 this.score = in.readFloat(); 51 this.index = in.readInt(); 52 this.doc = in.readInt(); 53 this.urlLen = in.readInt(); 54 } 55 56 public int compareTo(Object o) { 57 throw new RuntimeException ("this is never used"); 58 } 59 60 63 public static class ByHashScore extends WritableComparator { 64 public ByHashScore() { super(IndexedDoc.class); } 65 66 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ 67 int c = compareBytes(b1, s1, MD5Hash.MD5_LEN, b2, s2, MD5Hash.MD5_LEN); 68 if (c != 0) 69 return c; 70 71 float thisScore = readFloat(b1, s1+MD5Hash.MD5_LEN); 72 float thatScore = readFloat(b2, s2+MD5Hash.MD5_LEN); 73 74 if (thisScore < thatScore) 75 return 1; 76 else if (thisScore > thatScore) 77 return -1; 78 79 int thisUrlLen = readInt(b1, s1+MD5Hash.MD5_LEN+12); 80 int thatUrlLen = readInt(b2, s2+MD5Hash.MD5_LEN+12); 81 82 return thisUrlLen - thatUrlLen; 83 } 84 } 85 86 89 public static class ByHashDoc extends WritableComparator { 90 public ByHashDoc() { super(IndexedDoc.class); } 91 92 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ 93 int c = compareBytes(b1, s1, MD5Hash.MD5_LEN, b2, s2, MD5Hash.MD5_LEN); 94 if (c != 0) 95 return c; 96 97 int thisIndex = readInt(b1, s1+MD5Hash.MD5_LEN+4); 98 int thatIndex = readInt(b2, s2+MD5Hash.MD5_LEN+4); 99 100 if (thisIndex != thatIndex) 101 return thatIndex - thisIndex; 102 103 int thisDoc = readInt(b1, s1+MD5Hash.MD5_LEN+8); 104 int thatDoc = readInt(b2, s2+MD5Hash.MD5_LEN+8); 105 106 return thatDoc - thisDoc; 107 } 108 } 109 } 110 111 113 private interface Hasher { 114 void updateHash(MD5Hash hash, Document doc); 115 } 116 117 private IndexReader[] readers; 121 private File tempFile; 122 123 126 public DeleteDuplicates(IndexReader[] readers, File workingDir) throws IOException { 127 this.readers = readers; 128 this.tempFile = new File(workingDir, "ddup-" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()))); 129 } 130 131 134 public void close() throws IOException { 135 for (int i = 0; i < readers.length; i++) { 136 readers[i].close(); 137 } 138 tempFile.delete(); 139 } 140 141 145 public void deleteContentDuplicates() throws IOException { 146 LOG.info("Reading content hashes..."); 147 computeHashes(new Hasher() { 148 public void updateHash(MD5Hash hash, Document doc) { 149 hash.setDigest(doc.get("digest")); 150 } 151 }); 152 153 LOG.info("Sorting content hashes..."); 154 SequenceFile.Sorter byHashScoreSorter = 155 new SequenceFile.Sorter(new LocalFileSystem(), new IndexedDoc.ByHashScore(),NullWritable.class); 156 byHashScoreSorter.sort(tempFile.getPath(), tempFile.getPath() + ".sorted"); 157 158 LOG.info("Deleting content duplicates..."); 159 int duplicateCount = deleteDuplicates(); 160 LOG.info("Deleted " + duplicateCount + " content duplicates."); 161 } 162 163 167 public void deleteUrlDuplicates() throws IOException { 168 final MessageDigest digest; 169 try { 170 digest = MessageDigest.getInstance("MD5"); 171 } catch (Exception e) { 172 throw new RuntimeException (e.toString()); 173 } 174 175 LOG.info("Reading url hashes..."); 176 computeHashes(new Hasher() { 177 public void updateHash(MD5Hash hash, Document doc) { 178 try { 179 digest.update(UTF8.getBytes(doc.get("url"))); 180 digest.digest(hash.getDigest(), 0, MD5Hash.MD5_LEN); 181 } catch (Exception e) { 182 throw new RuntimeException (e.toString()); 183 } 184 } 185 }); 186 187 LOG.info("Sorting url hashes..."); 188 SequenceFile.Sorter byHashDocSorter = 189 new SequenceFile.Sorter(new LocalFileSystem(), new IndexedDoc.ByHashDoc(), NullWritable.class); 190 byHashDocSorter.sort(tempFile.getPath(), tempFile.getPath() + ".sorted"); 191 192 LOG.info("Deleting url duplicates..."); 193 int duplicateCount = deleteDuplicates(); 194 LOG.info("Deleted " + duplicateCount + " url duplicates."); 195 } 196 197 200 private void computeHashes(Hasher hasher) throws IOException { 201 IndexedDoc indexedDoc = new IndexedDoc(); 202 203 SequenceFile.Writer writer = 204 new SequenceFile.Writer(new LocalFileSystem(), tempFile.getPath(), IndexedDoc.class, NullWritable.class); 205 try { 206 for (int index = 0; index < readers.length; index++) { 207 IndexReader reader = readers[index]; 208 int readerMax = reader.maxDoc(); 209 indexedDoc.index = index; 210 for (int doc = 0; doc < readerMax; doc++) { 211 if (!reader.isDeleted(doc)) { 212 Document document = reader.document(doc); 213 hasher.updateHash(indexedDoc.hash, document); 214 indexedDoc.score = Float.parseFloat(document.get("boost")); 215 indexedDoc.doc = doc; 216 indexedDoc.urlLen = document.get("url").length(); 217 writer.append(indexedDoc, NullWritable.get()); 218 } 219 } 220 } 221 } finally { 222 writer.close(); 223 } 224 } 225 226 229 private int deleteDuplicates() throws IOException { 230 if (tempFile.exists()) { 231 tempFile.delete(); 232 } 233 if (!new File(tempFile.getPath() + ".sorted").renameTo(tempFile)) { 234 throw new IOException("Couldn't rename!"); 235 } 236 237 IndexedDoc indexedDoc = new IndexedDoc(); 238 SequenceFile.Reader reader = new SequenceFile.Reader(new LocalFileSystem(), tempFile.getPath()); 239 try { 240 int duplicateCount = 0; 241 MD5Hash prevHash = null; while (reader.next(indexedDoc, NullWritable.get())) { 243 if (prevHash == null) { prevHash = new MD5Hash(); 245 prevHash.set(indexedDoc.hash); 246 continue; 247 } 248 if (indexedDoc.hash.equals(prevHash)) { readers[indexedDoc.index].delete(indexedDoc.doc); duplicateCount++; 251 } else { 252 prevHash.set(indexedDoc.hash); } 254 } 255 return duplicateCount; 256 } finally { 257 reader.close(); 258 tempFile.delete(); 259 } 260 } 261 262 265 public static void main(String [] args) throws Exception { 266 String usage = "DeleteDuplicates (-local | -ndfs <namenode:port>) [-workingdir <workingdir>] <segmentsDir>"; 270 if (args.length < 2) { 271 System.err.println("Usage: " + usage); 272 return; 273 } 274 275 NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); 276 File workingDir = new File(new File("").getCanonicalPath()); 277 try { 278 int j = 0; 282 if ("-workingdir".equals(args[j])) { 283 j++; 284 workingDir = new File(new File(args[j++]).getCanonicalPath()); 285 } 286 workingDir = new File(workingDir, "ddup-workingdir"); 287 288 String segmentsDir = args[j++]; 289 File[] directories = nfs.listFiles(new File(segmentsDir)); 290 Vector vReaders = new Vector(); 291 Vector putbackList = new Vector(); 292 int maxDoc = 0; 293 294 for (int i = 0; i < directories.length; i++) { 295 File indexDone = new File(directories[i], IndexSegment.DONE_NAME); 299 if (nfs.exists(indexDone) && nfs.isFile(indexDone)) { 300 File indexDir = new File(directories[i], "index"); 304 File tmpDir = new File(workingDir, "ddup-" + new SimpleDateFormat("yyyMMddHHmmss").format(new Date(System.currentTimeMillis()))); 305 File localIndexDir = nfs.startLocalOutput(indexDir, tmpDir); 306 307 putbackList.add(indexDir); 308 putbackList.add(tmpDir); 309 310 IndexReader reader = IndexReader.open(localIndexDir); 314 if (reader.hasDeletions()) { 315 LOG.info("Clearing old deletions in " + indexDir + "(" + localIndexDir + ")"); 316 reader.undeleteAll(); 317 } 318 maxDoc += reader.maxDoc(); 319 vReaders.add(reader); 320 } 321 } 322 323 IndexReader[] readers = new IndexReader[vReaders.size()]; 327 for(int i = 0; vReaders.size()>0; i++) { 328 readers[i] = (IndexReader)vReaders.remove(0); 329 } 330 331 if (workingDir.exists()) { 332 FileUtil.fullyDelete(workingDir); 333 } 334 workingDir.mkdirs(); 335 DeleteDuplicates dd = new DeleteDuplicates(readers, workingDir); 336 dd.deleteUrlDuplicates(); 337 dd.deleteContentDuplicates(); 338 dd.close(); 339 340 LOG.info("Duplicate deletion complete locally. Now returning to NFS..."); 344 for (Iterator it = putbackList.iterator(); it.hasNext(); ) { 345 File indexDir = (File) it.next(); 346 File tmpDir = (File) it.next(); 347 nfs.completeLocalOutput(indexDir, tmpDir); 348 } 349 LOG.info("DeleteDuplicates complete"); 350 FileUtil.fullyDelete(workingDir); 351 } finally { 352 nfs.close(); 353 } 354 } 355 } 356 | Popular Tags |