1 18 package org.apache.activemq.kaha.impl.data; 19 20 import java.io.IOException ; 21 import java.io.RandomAccessFile ; 22 23 import org.apache.activemq.kaha.Marshaller; 24 import org.apache.activemq.util.DataByteArrayOutputStream; 25 31 final public class SyncDataFileWriter { 32 33 private DataByteArrayOutputStream buffer; 34 private DataManagerImpl dataManager; 35 36 37 42 SyncDataFileWriter(DataManagerImpl fileManager){ 43 this.dataManager=fileManager; 44 this.buffer=new DataByteArrayOutputStream(); 45 } 46 47 50 public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException { 51 52 buffer.reset(); 54 buffer.position(DataManagerImpl.ITEM_HEAD_SIZE); 55 marshaller.writePayload(payload,buffer); 56 int size=buffer.size(); 57 int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE; 58 buffer.reset(); 59 buffer.writeByte(type); 60 buffer.writeInt(payloadSize); 61 62 DataItem item=new DataItem(); 64 item.setSize(payloadSize); 65 DataFile dataFile=dataManager.findSpaceForData(item); 66 67 dataFile.getRandomAccessFile().seek(item.getOffset()); 69 dataFile.getRandomAccessFile().write(buffer.getData(),0,size); 70 dataFile.setWriterData(Boolean.TRUE); 72 dataManager.addInterestInFile(dataFile); 73 return item; 74 } 75 76 79 public synchronized void updateItem(DataItem item,Marshaller marshaller, Object payload, byte type) throws IOException { 80 buffer.reset(); 82 buffer.position(DataManagerImpl.ITEM_HEAD_SIZE); 83 marshaller.writePayload(payload,buffer); 84 int size=buffer.size(); 85 int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE; 86 buffer.reset(); 87 buffer.writeByte(type); 88 buffer.writeInt(payloadSize); 89 item.setSize(payloadSize); 90 DataFile dataFile = dataManager.getDataFile(item); 91 RandomAccessFile file = dataFile.getRandomAccessFile(); 92 file.seek(item.getOffset()); 93 file.write(buffer.getData(),0,size); 94 dataFile.setWriterData(Boolean.TRUE); } 96 97 public synchronized void force(DataFile dataFile) throws IOException { 98 if( dataFile.getWriterData()!=null && dataFile.isDirty()) { 100 dataFile.getRandomAccessFile().getFD().sync(); 101 dataFile.setWriterData(null); 102 dataFile.setDirty(false); 103 } 104 } 105 106 public void close() throws IOException { 107 } 108 } 109 | Popular Tags |