1 18 package org.apache.activemq.kaha.impl.async; 19 20 import java.io.IOException ; 21 import java.io.RandomAccessFile ; 22 import java.util.concurrent.ConcurrentHashMap ; 23 24 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; 25 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; 26 import org.apache.activemq.util.ByteSequence; 27 33 final class DataFileAccessor { 34 35 private final DataFile dataFile; 36 private final ConcurrentHashMap <WriteKey, WriteCommand> inflightWrites; 37 private final RandomAccessFile file; 38 private boolean disposed; 39 40 46 public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException { 47 this.dataFile = dataFile; 48 this.inflightWrites = dataManager.getInflightWrites(); 49 this.file = dataFile.openRandomAccessFile(false); 50 } 51 52 public DataFile getDataFile() { 53 return dataFile; 54 } 55 56 public void dispose() { 57 if( disposed ) 58 return; 59 disposed=true; 60 try { 61 dataFile.closeRandomAccessFile(file); 62 } catch (IOException e) { 63 e.printStackTrace(); 64 } 65 } 66 67 public ByteSequence readRecord(Location location) throws IOException { 68 69 if( !location.isValid() ) 70 throw new IOException ("Invalid location: "+location); 71 72 WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location)); 73 if( asyncWrite!= null ) { 74 return asyncWrite.data; 75 } 76 77 try { 78 79 if( location.getSize()==Location.NOT_SET ) { 80 file.seek(location.getOffset()); 81 location.setSize(file.readInt()); 82 file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE); 83 } else { 84 file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE); 85 } 86 87 byte[] data=new byte[location.getSize()-AsyncDataManager.ITEM_HEAD_FOOT_SPACE]; 88 file.readFully(data); 89 return new ByteSequence(data, 0, data.length); 90 91 } catch (RuntimeException e) { 92 throw new IOException ("Invalid location: "+location+", : "+e); 93 } 94 } 95 96 public void readLocationDetails(Location location) throws IOException { 97 WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location)); 98 if( asyncWrite!= null ) { 99 location.setSize(asyncWrite.location.getSize()); 100 location.setType(asyncWrite.location.getType()); 101 } else { 102 file.seek(location.getOffset()); 103 location.setSize(file.readInt()); 104 location.setType(file.readByte()); 105 } 106 } 107 108 public boolean readLocationDetailsAndValidate(Location location) { 109 try { 110 WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location)); 111 if( asyncWrite!= null ) { 112 location.setSize(asyncWrite.location.getSize()); 113 location.setType(asyncWrite.location.getType()); 114 } else { 115 file.seek(location.getOffset()); 116 location.setSize(file.readInt()); 117 location.setType(file.readByte()); 118 119 byte data[] = new byte[3]; 120 file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR); 121 file.readFully(data); 122 if( data[0] != AsyncDataManager.ITEM_HEAD_SOR[0] || 123 data[1] != AsyncDataManager.ITEM_HEAD_SOR[1] || 124 data[2] != AsyncDataManager.ITEM_HEAD_SOR[2] ) { 125 return false; 126 } 127 file.seek(location.getOffset()+location.getSize()-AsyncDataManager.ITEM_FOOT_SPACE); 128 file.readFully(data); 129 if( data[0] != AsyncDataManager.ITEM_HEAD_EOR[0] || 130 data[1] != AsyncDataManager.ITEM_HEAD_EOR[1] || 131 data[2] != AsyncDataManager.ITEM_HEAD_EOR[2] ) { 132 return false; 133 } 134 } 135 } catch (IOException e) { 136 return false; 137 } 138 return true; 139 } 140 141 public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException { 142 143 file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE); 144 int size = Math.min(data.getLength(), location.getSize()); 145 file.write(data.getData(), data.getOffset(), size); 146 if( sync ) { 147 file.getFD().sync(); 148 } 149 150 } 151 152 } 153 | Popular Tags |