1 24 package org.objectweb.jalisto.se.storage.raf.log.asynraf; 25 26 import org.objectweb.jalisto.se.api.internal.JalistoObject; 27 import org.objectweb.jalisto.se.api.JalistoProperties; 28 import org.objectweb.jalisto.se.exception.JalistoException; 29 import org.objectweb.jalisto.se.impl.InFileAddress; 30 import org.objectweb.jalisto.se.impl.trace.Trace; 31 import org.objectweb.jalisto.se.storage.raf.DbByteArrayOutputStream; 32 import org.objectweb.jalisto.se.storage.raf.RecordReader; 33 import org.objectweb.jalisto.se.storage.raf.RecordWriter; 34 35 import java.io.EOFException ; 36 import java.io.IOException ; 37 import java.io.RandomAccessFile ; 38 import java.util.*; 39 40 public class AsynrafLogger { 41 42 public AsynrafLogger(JalistoProperties properties, PhysicalFileAccessLogAsynrafImpl physical) 43 throws IOException { 44 this.physical = physical; 45 this.trace = physical.getTrace(); 46 infos = new HashMap(); 47 demon = new LogDemon(this, physical); 48 file = new RandomAccessFile (properties.getLogFileName(), "rw"); 49 file.seek(0); 50 currentWriteAddress = 0; 51 maxKeyLength = properties.getKeyLength(); 52 } 53 54 public Object [] getAddressesForExtent(String filter) { 55 Object [] result = new Object [2]; 56 ArrayList toRemove = new ArrayList(); 57 ArrayList toAdd = new ArrayList(); 58 result[0] = toAdd; 59 result[1] = toRemove; 60 synchronized (infos) { 61 Iterator addresses = infos.keySet().iterator(); 62 while (addresses.hasNext()) { 63 String address = (String ) addresses.next(); 64 if (address.startsWith(filter)) { 65 LogInfo logInfo = (LogInfo) infos.get(address); 66 if (logInfo.getActionType() == LogInfo.INSERT_TYPE) { 67 toAdd.add(address); 68 } else if (logInfo.getActionType() == LogInfo.DELETE_TYPE) { 69 toRemove.add(address); 70 } 71 } 72 } 73 } 74 return result; 75 } 76 77 public void insertFileObject(InFileAddress ifa, JalistoObject fo) { 78 if (!isCommitting) { 79 throw new JalistoException("access minilog outside commit process"); 80 } 81 try { 82 synchronized (file) { 83 RecordWriter rw = new RecordWriter(ifa.getAddress()); 84 rw.writeObject(fo); 85 indexCounter++; 86 LogInfo info = new LogInfo(LogInfo.INSERT_TYPE, ifa.getClone(), indexCounter); 87 infos.put(ifa.getAddress(), info); 88 writeType(LogInfo.INSERT_TYPE); 89 writeIfa(ifa); 90 writeLong(rw.getDataLength()); 91 info.setValueAddress(currentWriteAddress); 92 info.setValueLength(rw.getDataLength()); 93 file.seek(currentWriteAddress); 94 rw.writeTo(file); 95 currentWriteAddress = file.getFilePointer(); 96 rw.reset(); 97 } 98 } catch (Exception e) { 99 throw new JalistoException("could not insert object", e); 100 } 101 } 102 103 public JalistoObject readFileObjectAt(InFileAddress ifa) { 104 try { 105 RecordReader rr; 106 try { 107 LogInfo info = (LogInfo) infos.get(ifa.getAddress()); 108 if (info != null) { 109 if (info.getActionType() == LogInfo.DELETE_TYPE) { 110 throw new JalistoException("this object has been deleted : " + ifa); 111 } 112 byte[] datas = new byte[(int) info.getValueLength()]; 113 synchronized (file) { 114 file.seek(info.getValueAddress()); 115 file.readFully(datas); 116 } 117 rr = new RecordReader(ifa.getAddress(), datas); 118 } else { 119 rr = physical.internalRead(ifa); 120 } 121 } catch (EOFException eofe) { 122 rr = physical.internalRead(ifa); 123 } 124 JalistoObject result = (JalistoObject) rr.readObject(); 125 result.setIfa(ifa); 126 rr.reset(); 127 return result; 128 } catch (Exception e) { 129 throw new JalistoException(e); 130 } 131 } 132 133 public void updateFileObject(InFileAddress ifa, JalistoObject fo) { 134 if (!isCommitting) { 135 throw new JalistoException("access minilog outside commit process"); 136 } 137 try { 138 synchronized (file) { 139 RecordWriter rw = new RecordWriter(ifa.getAddress()); 140 rw.writeObject(fo); 141 indexCounter++; 142 LogInfo info = new LogInfo(LogInfo.UPDATE_TYPE, ifa.getClone(), indexCounter); 143 infos.put(ifa.getAddress(), info); 144 writeType(LogInfo.UPDATE_TYPE); 145 writeIfa(ifa); 146 writeLong(rw.getDataLength()); 147 info.setValueAddress(currentWriteAddress); 148 info.setValueLength(rw.getDataLength()); 149 file.seek(currentWriteAddress); 150 rw.writeTo(file); 151 rw.reset(); 152 currentWriteAddress = file.getFilePointer(); 153 RecordReader rr = physical.internalRead(ifa); 154 writeLong(rr.getData().length); 155 info.setOldValueAddress(currentWriteAddress); 156 info.setOldValueLength(rr.getData().length); 157 file.seek(currentWriteAddress); 158 file.write(rr.getData()); 159 rr.reset(); 160 currentWriteAddress = file.getFilePointer(); 161 } 162 } catch (Exception e) { 163 throw new JalistoException("could not update object", e); 164 } 165 } 166 167 public void deleteFileObject(InFileAddress ifa) { 168 if (!isCommitting) { 169 throw new JalistoException("access minilog outside commit process"); 170 } 171 try { 172 synchronized (file) { 173 indexCounter++; 174 LogInfo info = new LogInfo(LogInfo.DELETE_TYPE, ifa.getClone(), indexCounter); 175 infos.put(ifa.getAddress(), info); 176 writeType(LogInfo.DELETE_TYPE); 177 writeIfa(ifa); 178 RecordReader rr = physical.internalRead(ifa); 179 writeLong(rr.getData().length); 180 info.setOldValueAddress(currentWriteAddress); 181 info.setOldValueLength(rr.getData().length); 182 file.seek(currentWriteAddress); 183 file.write(rr.getData()); 184 rr.reset(); 185 currentWriteAddress = file.getFilePointer(); 186 } 187 } catch (Exception e) { 188 throw new JalistoException("could not delete object", e); 189 } 190 } 191 192 public void open() { 193 demon.open(); 194 } 195 196 public void close() { 197 while (!infos.isEmpty()) { 198 Thread.yield(); 199 } 200 demon.close(); 201 } 202 203 public void startCommitting() throws IOException { 204 while (demon.isWorking()) { 205 Thread.yield(); 206 } 207 isCommitting = true; 208 indexCounter = -1; 209 writeStatus(LOGGER_WORKING); 210 } 211 212 public void finishCommitting() throws IOException { 213 writeStatus(DEMON_WORKING); 214 isCommitting = false; 215 demon.setWorking(); 216 } 217 218 public boolean isCommitting() { 219 return isCommitting; 220 } 221 222 public RandomAccessFile getFile() { 223 return file; 224 } 225 226 public List getSortedInfos() { 227 if (isCommitting) { 228 throw new JalistoException("AsynrafLogger is committing"); 229 } 230 List sortedInfos = new ArrayList(); 231 sortedInfos.addAll(infos.values()); 233 Collections.sort(sortedInfos); 235 return sortedInfos; 236 } 237 238 public void removeInfo(LogInfo info) { 239 if (isCommitting) { 240 throw new JalistoException("AsynrafLogger is committing"); 241 } 242 infos.remove(info.getIfa().getAddress()); 243 } 244 245 248 249 private void writeType(short type) throws IOException { 250 file.seek(currentWriteAddress); 251 file.writeShort(type); 252 currentWriteAddress = file.getFilePointer(); 253 } 254 255 public void writeStatus(short status) throws IOException { 256 synchronized (file) { 257 file.seek(0); 258 file.writeShort(status); 259 currentWriteAddress = file.getFilePointer(); 260 } 261 } 262 263 private void writeLong(long value) throws IOException { 264 file.seek(currentWriteAddress); 265 file.writeLong(value); 266 currentWriteAddress = file.getFilePointer(); 267 } 268 269 private void writeIfa(InFileAddress ifa) throws IOException { 270 String key = ifa.getAddress(); 271 DbByteArrayOutputStream temp = DbByteArrayOutputStream.getInstance(); 272 temp.getOutput().writeUTF(key); 273 274 if (temp.size() > maxKeyLength) { 275 throw new JalistoException("Key " + key + " of size " + temp.size() + 276 " is larger than permitted size of " + 277 maxKeyLength + " bytes"); 278 } 279 file.seek(currentWriteAddress); 280 temp.writeTo(file); 281 temp.reset(); 282 file.writeInt(ifa.getFileIndex()); 283 file.writeInt(ifa.getIndex()); 284 currentWriteAddress = file.getFilePointer(); 285 } 286 287 public byte[] readBytes(long address, int size) throws IOException { 288 synchronized (file) { 289 file.seek(address); 290 byte[] result = new byte[size]; 291 file.readFully(result); 292 return result; 293 } 294 } 295 296 public void setLength(long length) throws IOException { 297 synchronized (file) { 298 file.setLength(length); 299 } 300 } 301 302 303 private PhysicalFileAccessLogAsynrafImpl physical; 304 private RandomAccessFile file; 305 private HashMap infos; 306 private LogDemon demon; 307 private long currentWriteAddress; 308 private int maxKeyLength; 309 private Trace trace; 310 private boolean isCommitting = false; 311 312 private int indexCounter; 313 314 public final static short LOGGER_WORKING = 32; 315 public final static short DEMON_WORKING = 478; 316 public final static short NOBODY_WORKING = 174; 317 } 318 | Popular Tags |