1 46 package org.mr.core.persistent.file; 47 48 import java.io.File ; 49 import java.io.IOException ; 50 import java.io.FilenameFilter ; 51 import java.nio.ByteBuffer ; 52 import java.util.*; 53 54 import org.apache.commons.logging.Log; 55 import org.apache.commons.logging.LogFactory; 56 import org.mr.MantaAgent; 57 import org.mr.kernel.services.topics.VirtualTopicManager; 58 import org.mr.core.persistent.PersistentConst; 59 import org.mr.core.persistent.PersistentManager; 60 import org.mr.core.util.byteable.Byteable; 61 import org.mr.core.util.byteable.ByteableInputStream; 62 import org.mr.core.util.byteable.ByteableOutputStream; 63 import org.mr.core.util.StringUtils; 64 65 71 public class MantaFileManager implements PersistentManager { 72 73 private int numberOfFatFiles = PersistentManager.MAX_FILES_PER_STORAGE/MantaFatFile.MAX_FILES_IN_FAT; 74 private MantaFatFile fats[] = new MantaFatFile[numberOfFatFiles]; 75 private String persistentName = null; 76 private static Log log; 77 static Timer cleaner = new Timer(true); 78 public static final long fatCleanerDeley = 5000; 79 boolean recoverd = false; 80 public static boolean forceEveryEntry = false; 81 82 public MantaFileManager(String persistentName){ 83 this.persistentName = persistentName; 84 log=LogFactory.getLog("MantaFileManager"); 85 forceEveryEntry = MantaAgent.getInstance().getSingletonRepository() 86 .getConfigManager().getBooleanProperty("persistency.file.force_every_entry", false); 87 } 88 89 93 public MantaFileManager() { 94 fats = null; 95 cleaner = null; 96 } 97 98 public synchronized void recover(){ 99 if(recoverd){ 100 return; 101 } 102 int newOrder = 0; 103 File persistentDir = new File (PersistentConst.getPersistentDir(persistentName)); 104 File fatFiles[] =persistentDir.listFiles(); 105 Arrays.sort(fatFiles, new Comparator(){ 107 108 public int compare(Object o1, Object o2) { 109 File a = (File ) o1; 110 File b = (File ) o2; 111 return a.getName().compareTo(b.getName()); 112 } 113 114 }); 115 for (int i = 0; i < fatFiles.length; i++) { 117 File recovedFile =fatFiles[i]; 118 String fatName = recovedFile.getName(); 119 if(fatName.startsWith(persistentName+"_#")){ 120 String oldFileName = PersistentConst.getPersistentDir(persistentName)+File.separator+"old"+fatName; 121 File old = new File (oldFileName); 122 if(old.exists()){ 123 old.delete(); 124 } 125 boolean renamed = recovedFile.renameTo(old); 126 try { 127 if(renamed){ 128 MantaFatFile recoverd = new MantaFatFile(oldFileName); 129 int[] entries = recoverd.getFileList(); 130 if(entries.length >0){ 131 for (int j = 0; j < entries.length; j++) { 132 ByteBuffer buff = recoverd.load(entries[j]); 133 savePersistentBuffer(newOrder,buff); 134 newOrder++; 135 PersistentConst.getPersistentByteBufferPool().release(buff); 136 } 137 } 138 recoverd.close(); 139 recoverd = null; 140 recovedFile = null; 141 boolean b = old.delete(); 142 if(!b){ 143 FatCleanerTasks task = new FatCleanerTasks(old, 5); 144 cleaner.schedule(task, fatCleanerDeley); 145 } 146 147 148 } } catch (Throwable e) { 150 if(log.isErrorEnabled()) 151 log.error("Could not recover "+persistentName+" : file number"+i+".",e); 152 } 153 } } recoverd = true; 156 } 157 158 159 private MantaFatFile createFatFile(int FatFileIndex){ 160 MantaFatFile f = new MantaFatFile(PersistentConst.getPersistentDir(persistentName)+File.separator+persistentName+"_#"+FatFileIndex); 161 fats[FatFileIndex] = f; 162 return f; 163 } 164 165 private void deleteFatFile(int FatFileIndex) throws IOException { 166 String fileName =PersistentConst.getPersistentDir(persistentName)+File.separator+persistentName+"_#"+FatFileIndex; 167 if(fats[FatFileIndex]!= null){ 168 fats[FatFileIndex].close(); 169 fats[FatFileIndex]= null; 170 File f = new File (fileName); 171 f.delete(); 172 173 } 174 } 175 176 private int getFatFileNumByEntry(int entryIndex ){ 177 return (int)Math.floor(entryIndex/(MantaFatFile.MAX_FILES_IN_FAT-1 )) ; 178 179 } 180 181 private int getEntryIndexInFAT(int entryIndex,int FATNamber){ 182 int result = (entryIndex-(FATNamber*(MantaFatFile.MAX_FILES_IN_FAT-1)))+1; 183 return result; 184 } 185 186 187 188 191 public synchronized int[] getKeys() { 192 LinkedList list = new LinkedList(); 193 for (int i = 0; i < fats.length; i++) { 194 MantaFatFile file = fats[i]; 195 if(file != null){ 196 int entries[] =file.getFileList(); 197 for (int j = 0; j < entries.length; j++) { 198 list.add(new Integer ((entries[j]+(i*(MantaFatFile.MAX_FILES_IN_FAT-1)))-1)); 199 } 200 201 } 202 } 203 int[] result = new int[list.size()]; 204 for (int i = 0; i < result.length; i++) { 205 result[i] = ((Integer )list.get(i)).intValue(); 206 } 207 return result; 208 } 209 210 ByteableInputStream in = new ByteableInputStream(); 211 214 public synchronized Object getPersistentObject(int i) { 215 Byteable result = null; 216 217 try{ 218 ByteBuffer buff = getPersistentBuffer( i); 219 220 in.setUnderLine(buff); 221 result = in.readByteable(); 222 PersistentConst.getPersistentByteBufferPool().release(buff); 223 }catch(Throwable t){ 225 if(log.isErrorEnabled()) 226 log.error("Could not recover "+persistentName+" : "+i+".",t); 227 } 228 return result; 229 } 230 231 234 public synchronized void deletePersistentObject(int i) { 235 int fileNum = getFatFileNumByEntry(i); 236 MantaFatFile fat = fats[fileNum]; 237 if(fat != null){ 238 int entryIndexInFat = getEntryIndexInFAT(i,fileNum); fat.delete(entryIndexInFat); 240 } 241 242 } 243 244 247 public synchronized void savePersistentObject(int entryName, Byteable entry) throws IOException { 248 ByteableOutputStream out = new ByteableOutputStream(PersistentConst.getPersistentByteBufferPool()); 249 out.writeByteable(entry); 250 savePersistentBuffer(entryName,out.getByteBuffer()); 251 out.release(); 252 253 } 254 255 258 public synchronized void clearStorage() throws IOException { 259 for (int i = 0; i < fats.length; i++) { 260 if(fats!= null){ 261 deleteFatFile(i); 262 } 263 264 } 265 266 } 267 268 271 public synchronized void savePersistentBuffer(int entryName, ByteBuffer byteBuffer) throws IOException { 272 273 int fatNum = getFatFileNumByEntry(entryName); 274 MantaFatFile fat = fats[fatNum]; 275 if(fat == null){ 276 fat = createFatFile(fatNum); 277 } 278 279 int entryIndexInFat = getEntryIndexInFAT(entryName,fatNum ); 281 fat.save(entryIndexInFat , byteBuffer); 282 283 } 284 285 288 public synchronized ByteBuffer getPersistentBuffer(int i) { 289 int fileNum = getFatFileNumByEntry(i); 290 MantaFatFile fat = fats[fileNum]; 291 int entryIndexInFat = getEntryIndexInFAT(i,fileNum ); ByteBuffer buff = fat.load(entryIndexInFat); 293 return buff; 294 } 295 296 299 public String getStorageName() { 300 301 return persistentName; 302 } 303 304 public String [] getAllServices() { 305 HashSet services = new HashSet(); 306 String persistentDir = MantaAgent.getInstance().getSingletonRepository(). 307 getConfigManager().getStringProperty("persistency.file.persistent_folder","./persistent"); 308 String delimiter = MantaAgent.getInstance().getSingletonRepository(). 309 getConfigManager().getStringProperty("persistency.hierarchy_delimiter","~"); 310 File dir = new File (persistentDir); 311 312 String list[] = dir.list(new FilenameFilter () { 313 public boolean accept(File dir,String file) { 314 return file.startsWith(PersistentManager.SUBSCRIBERS_PERSISTENT_PREFIX); 315 } 316 }); 317 318 for (int i = 0 ; i < list.length; i++) { 319 services.add(StringUtils.replace(list[i] 320 .substring(PersistentManager.SUBSCRIBERS_PERSISTENT_PREFIX.length(), 321 list[i].lastIndexOf("_#")) 322 ,delimiter, VirtualTopicManager.HIERARCHICAL_TOPIC_DELIMITER)); 323 } 324 325 return (String []) services.toArray(list); 326 } 327 328 } 329 | Popular Tags |