1 2 3 package net.nutch.db; 4 5 import java.io.*; 6 import java.util.*; 7 import java.util.logging.*; 8 9 import net.nutch.io.*; 10 import net.nutch.fs.*; 11 import net.nutch.util.*; 12 13 23 public class EditSectionGroupReader { 24 static final Logger LOG = LogFormatter.getLogger("net.nutch.db.EditSectionGroupReader"); 25 private final static String MERGED_EDITS = "merged_edits"; 26 private final static int SLEEP_INTERVAL = 3000; 27 private final static int WORRY_INTERVALS = 5; 28 29 NutchFileSystem nfs; 30 String label; 31 int readerNum = -1, totalMachines = -1, numEdits = 0; 32 boolean sectionComplete = false; 33 34 37 public EditSectionGroupReader(NutchFileSystem nfs, String label, int readerNum, int totalMachines) { 38 this.nfs = nfs; 39 this.label = label; 40 this.readerNum = readerNum; 41 this.totalMachines = totalMachines; 42 } 43 44 50 private synchronized void sectionComplete() throws IOException { 51 if (! sectionComplete) { 52 for (int i = 0; i < totalMachines; i++) { 57 File allEditsDir = new File("editsection." + readerNum, "editsdir." + i); 59 File editsDir = new File(allEditsDir, label); 60 File editsList = new File(editsDir, "editslist"); 61 File editsInfo = new File(editsDir, "editsinfo"); 62 63 while (! nfs.exists(editsInfo)) { 65 try { 66 Thread.sleep(2000); 67 } catch (InterruptedException ie) { 68 } 69 } 70 71 DataInputStream in = new DataInputStream(nfs.open(editsInfo)); 73 try { 74 in.read(); this.numEdits += in.readInt(); } finally { 77 in.close(); 78 } 79 } 80 sectionComplete = true; 81 } 82 } 83 84 85 89 public int numEdits() throws IOException { 90 sectionComplete(); 91 return numEdits; 92 } 93 94 99 public void mergeSectionComponents(File mergedEditsFile) throws IOException { 100 sectionComplete(); 102 103 File allEdits0 = new File("editsection." + readerNum, "editsdir." + 0); 107 File editsDir0 = new File(allEdits0, label); 108 File editsList0 = new File(editsDir0, "editslist"); 109 while (! nfs.exists(editsList0)) { 110 try { 111 Thread.sleep(2000); 112 } catch (InterruptedException ie) { 113 } 114 } 115 116 SequenceFile.Reader test = new SequenceFile.Reader(nfs, editsList0.getPath()); 117 Class keyClass = null; 118 try { 119 keyClass = test.getKeyClass(); 120 } finally { 121 test.close(); 122 } 123 124 try { 128 Writable key = (Writable) keyClass.newInstance(); 129 SequenceFile.Writer out = new SequenceFile.Writer(nfs, mergedEditsFile.getPath(), keyClass, NullWritable.class); 130 131 try { 132 for (int i = 0; i < totalMachines; i++) { 133 File allEditsDir = new File("editsection." + readerNum, "editsdir." + i); 134 File editsDir = new File(allEditsDir, label); 135 File editsList = new File(editsDir, "editslist"); 136 while (! nfs.exists(editsList)) { 137 try { 138 Thread.sleep(2000); 139 } catch (InterruptedException ie) { 140 } 141 } 142 143 SequenceFile.Reader in = new SequenceFile.Reader(nfs, editsList.getPath()); 144 try { 145 while (in.next(key)) { 146 out.append(key, NullWritable.get()); 147 } 148 } finally { 149 in.close(); 150 } 151 } 152 } finally { 153 out.close(); 154 } 155 } catch (InstantiationException ie) { 156 throw new IOException("Could not create instance of " + keyClass); 157 } catch (IllegalAccessException iae) { 158 throw new IOException("Could not create instance of " + keyClass); 159 } 160 } 161 162 165 public void delete() throws IOException { 166 for (int i = 0; i < totalMachines; i++) { 167 File editsDir = new File("editsection." + readerNum, "editsdir." + i); 169 File consumedEdits = new File(editsDir, label); 170 nfs.delete(consumedEdits); 171 } 172 } 173 } 174 | Popular Tags |