1 18 package org.apache.activemq.kaha.impl.async; 19 20 import java.io.IOException ; 21 import java.io.RandomAccessFile ; 22 import java.nio.ByteBuffer ; 23 import java.nio.channels.FileChannel ; 24 25 31 class NIODataFileAppender extends DataFileAppender { 32 33 public NIODataFileAppender(AsyncDataManager fileManager) { 34 super(fileManager); 35 } 36 37 47 protected void processQueue() { 48 DataFile dataFile=null; 49 RandomAccessFile file=null; 50 FileChannel channel=null; 51 52 try { 53 54 ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE); 55 ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE); 56 ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE); 57 58 header.putInt(0); header.put((byte) 0); header.put(RESERVED_SPACE); header.put(AsyncDataManager.ITEM_HEAD_SOR); 63 footer.put(AsyncDataManager.ITEM_HEAD_EOR); 64 65 while( true ) { 66 67 Object o = null; 68 69 synchronized(enqueueMutex) { 71 while( true ) { 72 if( shutdown ) { 73 o = SHUTDOWN_COMMAND; 74 break; 75 } 76 if( nextWriteBatch!=null ) { 77 o = nextWriteBatch; 78 nextWriteBatch=null; 79 break; 80 } 81 enqueueMutex.wait(); 82 } 83 enqueueMutex.notify(); 84 } 85 86 87 if( o == SHUTDOWN_COMMAND ) { 88 break; 89 } 90 91 WriteBatch wb = (WriteBatch) o; 92 if( dataFile != wb.dataFile ) { 93 if( file!=null ) { 94 dataFile.closeRandomAccessFile(file); 95 } 96 dataFile = wb.dataFile; 97 file = dataFile.openRandomAccessFile(true); 98 channel = file.getChannel(); 99 } 100 101 WriteCommand write = wb.first; 102 103 file.seek(write.location.getOffset()); 107 108 if( wb.size == write.location.getSize() ) { 111 112 header.clear(); 113 header.putInt(write.location.getSize()); 114 header.put(write.location.getType()); 115 header.clear(); 116 transfer(header, channel); 117 ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength()); 118 transfer(source, channel); 119 footer.clear(); 120 transfer(footer, channel); 121 122 } else { 123 124 while( write!=null ) { 126 127 header.clear(); 128 header.putInt(write.location.getSize()); 129 header.put(write.location.getType()); 130 header.clear(); 131 copy(header, buffer); 132 assert !header.hasRemaining(); 133 134 ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength()); 135 copy(source, buffer); 136 assert !source.hasRemaining(); 137 138 footer.clear(); 139 copy(footer, buffer); 140 assert !footer.hasRemaining(); 141 142 write = (WriteCommand) write.getNext(); 143 } 144 145 buffer.flip(); 147 transfer(buffer, channel); 148 buffer.clear(); 149 } 150 151 file.getChannel().force(false); 152 153 WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode(); 154 dataManager.setLastAppendLocation( lastWrite.location ); 155 156 if( wb.latch!=null ) { 158 wb.latch.countDown(); 159 } 160 161 write = wb.first; 164 while( write!=null ) { 165 if( !write.sync ) { 166 inflightWrites.remove(new WriteKey(write.location)); 167 } 168 write = (WriteCommand) write.getNext(); 169 } 170 } 171 172 } catch (IOException e) { 173 synchronized( enqueueMutex ) { 174 firstAsyncException = e; 175 } 176 } catch (InterruptedException e) { 177 } finally { 178 try { 179 if( file!=null ) { 180 dataFile.closeRandomAccessFile(file); 181 } 182 } catch (IOException e) { 183 } 184 shutdownDone.countDown(); 185 } 186 } 187 188 194 private void transfer(ByteBuffer header, FileChannel channel) throws IOException { 195 while (header.hasRemaining()) { 196 channel.write(header); 197 } 198 } 199 200 private int copy(ByteBuffer src, ByteBuffer dest) { 201 int rc = Math.min(dest.remaining(), src.remaining()); 202 if( rc > 0 ) { 203 int limit = src.limit(); 205 src.limit(src.position()+rc); 206 dest.put(src); 207 src.limit(limit); 209 } 210 return rc; 211 } 212 213 } 214 | Popular Tags |