1 18 package org.apache.activemq.kaha.impl.data; 19 20 import java.io.File ; 21 import java.io.FilenameFilter ; 22 import java.io.IOException ; 23 import java.util.ArrayList ; 24 import java.util.HashMap ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 import java.util.Map ; 28 29 import org.apache.activemq.kaha.Marshaller; 30 import org.apache.activemq.kaha.StoreLocation; 31 import org.apache.activemq.kaha.impl.DataManager; 32 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem; 33 import org.apache.activemq.util.IOExceptionSupport; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 41 public final class DataManagerImpl implements DataManager { 42 43 private static final Log log=LogFactory.getLog(DataManagerImpl.class); 44 public static long MAX_FILE_LENGTH=1024*1024*32; 45 private static final String NAME_PREFIX="data-"; 46 private final File dir; 47 private final String name; 48 private SyncDataFileReader reader; 49 private SyncDataFileWriter writer; 50 private DataFile currentWriteFile; 51 private long maxFileLength = MAX_FILE_LENGTH; 52 Map fileMap=new HashMap (); 53 54 public static final int ITEM_HEAD_SIZE=5; public static final byte DATA_ITEM_TYPE=1; 56 public static final byte REDO_ITEM_TYPE=2; 57 58 Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER; 59 private String dataFilePrefix; 60 61 public DataManagerImpl(File dir, final String name){ 62 this.dir=dir; 63 this.name=name; 64 65 dataFilePrefix = NAME_PREFIX+name+"-"; 66 File [] files=dir.listFiles(new FilenameFilter (){ 68 public boolean accept(File dir,String n){ 69 return dir.equals(dir)&&n.startsWith(dataFilePrefix); 70 } 71 }); 72 if(files!=null){ 73 for(int i=0;i<files.length;i++){ 74 File file=files[i]; 75 String n=file.getName(); 76 String numStr=n.substring(dataFilePrefix.length(),n.length()); 77 int num=Integer.parseInt(numStr); 78 DataFile dataFile=new DataFile(file,num); 79 fileMap.put(dataFile.getNumber(),dataFile); 80 if(currentWriteFile==null||currentWriteFile.getNumber().intValue()<num){ 81 currentWriteFile=dataFile; 82 } 83 } 84 } 85 } 86 87 private DataFile createAndAddDataFile(int num){ 88 String fileName=dataFilePrefix+num; 89 File file=new File (dir,fileName); 90 DataFile result=new DataFile(file,num); 91 fileMap.put(result.getNumber(),result); 92 return result; 93 } 94 95 98 public String getName(){ 99 return name; 100 } 101 102 synchronized DataFile findSpaceForData(DataItem item) throws IOException { 103 if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>maxFileLength)){ 104 int nextNum=currentWriteFile!=null?currentWriteFile.getNumber().intValue()+1:1; 105 if(currentWriteFile!=null&¤tWriteFile.isUnused()){ 106 removeDataFile(currentWriteFile); 107 } 108 currentWriteFile=createAndAddDataFile(nextNum); 109 } 110 item.setOffset(currentWriteFile.getLength()); 111 item.setFile(currentWriteFile.getNumber().intValue()); 112 currentWriteFile.incrementLength(item.getSize()+ITEM_HEAD_SIZE); 113 return currentWriteFile; 114 } 115 116 DataFile getDataFile(StoreLocation item) throws IOException { 117 Integer key=new Integer (item.getFile()); 118 DataFile dataFile=(DataFile) fileMap.get(key); 119 if(dataFile==null){ 120 log.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 121 throw new IOException ("Could not locate data file "+NAME_PREFIX+name+"-"+item.getFile()); 122 } 123 return dataFile; 124 } 125 126 129 public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException { 130 return getReader().readItem(marshaller,item); 131 } 132 133 136 public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException { 137 return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE); 138 } 139 140 143 public synchronized StoreLocation storeRedoItem(Object payload) throws IOException { 144 return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE); 145 } 146 147 150 public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException { 151 getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE); 152 } 153 154 157 public synchronized void recoverRedoItems(RedoListener listener) throws IOException { 158 159 if( currentWriteFile == null ) 161 return; 162 163 DataItem item = new DataItem(); 164 item.setFile(currentWriteFile.getNumber().intValue()); 165 item.setOffset(0); 166 while( true ) { 167 byte type; 168 try { 169 type = getReader().readDataItemSize(item); 170 } catch (IOException ignore) { 171 log.trace("End of data file reached at (header was invalid): "+item); 172 return; 173 } 174 if( type == REDO_ITEM_TYPE ) { 175 Object object; 177 try { 178 object = readItem(redoMarshaller, item); 179 } catch (IOException e1) { 180 log.trace("End of data file reached at (payload was invalid): "+item); 181 return; 182 } 183 try { 184 185 listener.onRedoItem(item, object); 186 item = item.copy(); 189 190 } catch (Exception e) { 191 throw IOExceptionSupport.create("Recovery handler failed: "+e,e); 192 } 193 } 194 item.setOffset(item.getOffset()+ITEM_HEAD_SIZE+item.getSize()); 196 } 197 } 198 199 202 public synchronized void close() throws IOException { 203 getWriter().close(); 204 for(Iterator i=fileMap.values().iterator();i.hasNext();){ 205 DataFile dataFile=(DataFile) i.next(); 206 getWriter().force(dataFile); 207 dataFile.close(); 208 } 209 fileMap.clear(); 210 } 211 212 215 public synchronized void force() throws IOException { 216 for(Iterator i=fileMap.values().iterator();i.hasNext();){ 217 DataFile dataFile=(DataFile) i.next(); 218 getWriter().force(dataFile); 219 } 220 } 221 222 223 226 public synchronized boolean delete() throws IOException { 227 boolean result=true; 228 for(Iterator i=fileMap.values().iterator();i.hasNext();){ 229 DataFile dataFile=(DataFile) i.next(); 230 result&=dataFile.delete(); 231 } 232 fileMap.clear(); 233 return result; 234 } 235 236 237 240 public synchronized void addInterestInFile(int file) throws IOException { 241 if(file>=0){ 242 Integer key=new Integer (file); 243 DataFile dataFile=(DataFile) fileMap.get(key); 244 if(dataFile==null){ 245 dataFile=createAndAddDataFile(file); 246 } 247 addInterestInFile(dataFile); 248 } 249 } 250 251 synchronized void addInterestInFile(DataFile dataFile){ 252 if(dataFile!=null){ 253 dataFile.increment(); 254 } 255 } 256 257 260 public synchronized void removeInterestInFile(int file) throws IOException { 261 if(file>=0){ 262 Integer key=new Integer (file); 263 DataFile dataFile=(DataFile) fileMap.get(key); 264 removeInterestInFile(dataFile); 265 } 266 } 267 268 synchronized void removeInterestInFile(DataFile dataFile) throws IOException { 269 if(dataFile!=null){ 270 if(dataFile.decrement()<=0){ 271 if(dataFile!=currentWriteFile){ 272 removeDataFile(dataFile); 273 } 274 } 275 } 276 } 277 278 281 public synchronized void consolidateDataFiles() throws IOException { 282 List purgeList=new ArrayList (); 283 for(Iterator i=fileMap.values().iterator();i.hasNext();){ 284 DataFile dataFile=(DataFile) i.next(); 285 if(dataFile.isUnused() && dataFile != currentWriteFile){ 286 purgeList.add(dataFile); 287 } 288 } 289 for(int i=0;i<purgeList.size();i++){ 290 DataFile dataFile=(DataFile) purgeList.get(i); 291 removeDataFile(dataFile); 292 } 293 } 294 295 private void removeDataFile(DataFile dataFile) throws IOException { 296 fileMap.remove(dataFile.getNumber()); 297 if(writer!=null){ 298 writer.force(dataFile); 299 } 300 boolean result=dataFile.delete(); 301 log.debug("discarding data file "+dataFile+(result?"successful ":"failed")); 302 } 303 304 307 public Marshaller getRedoMarshaller() { 308 return redoMarshaller; 309 } 310 311 314 public void setRedoMarshaller(Marshaller redoMarshaller) { 315 this.redoMarshaller = redoMarshaller; 316 } 317 318 321 public long getMaxFileLength(){ 322 return maxFileLength; 323 } 324 325 328 public void setMaxFileLength(long maxFileLength){ 329 this.maxFileLength=maxFileLength; 330 } 331 332 public String toString(){ 333 return "DataManager:("+NAME_PREFIX+name+")"; 334 } 335 336 public synchronized SyncDataFileReader getReader() { 337 if( reader == null ) { 338 reader = createReader(); 339 } 340 return reader; 341 } 342 protected synchronized SyncDataFileReader createReader() { 343 return new SyncDataFileReader(this); 344 } 345 public synchronized void setReader(SyncDataFileReader reader) { 346 this.reader = reader; 347 } 348 349 public synchronized SyncDataFileWriter getWriter() { 350 if( writer==null ) { 351 writer = createWriter(); 352 } 353 return writer; 354 } 355 private SyncDataFileWriter createWriter() { 356 return new SyncDataFileWriter(this); 357 } 358 public synchronized void setWriter(SyncDataFileWriter writer) { 359 this.writer = writer; 360 } 361 362 } 363 | Popular Tags |