1 18 package org.apache.activemq.kaha.impl.async; 19 20 import java.io.IOException ; 21 import java.io.InterruptedIOException ; 22 import java.io.RandomAccessFile ; 23 import java.util.concurrent.ConcurrentHashMap ; 24 import java.util.concurrent.CountDownLatch ; 25 26 import org.apache.activemq.util.ByteSequence; 27 import org.apache.activemq.util.DataByteArrayOutputStream; 28 import org.apache.activemq.util.LinkedNode; 29 30 36 class DataFileAppender { 37 38 protected static final byte []RESERVED_SPACE= new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE]; 39 protected static final String SHUTDOWN_COMMAND = "SHUTDOWN"; 40 int MAX_WRITE_BATCH_SIZE = 1024*1024*4; 41 42 static public class WriteKey { 43 private final int file; 44 private final long offset; 45 private final int hash; 46 47 public WriteKey(Location item){ 48 file = item.getDataFileId(); 49 offset = item.getOffset(); 50 hash = (int) (file ^ offset); 52 } 53 54 public int hashCode() { 55 return hash; 56 } 57 58 public boolean equals(Object obj) { 59 WriteKey di = (WriteKey)obj; 60 return di.file == file && di.offset == offset; 61 } 62 } 63 64 public class WriteBatch { 65 66 public final DataFile dataFile; 67 public final WriteCommand first; 68 public final CountDownLatch latch = new CountDownLatch (1); 69 public int size; 70 71 public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException { 72 this.dataFile=dataFile; 73 this.first=write; 74 size+=write.location.getSize(); 75 } 76 77 public boolean canAppend(DataFile dataFile, WriteCommand write) { 78 if( dataFile != this.dataFile ) 79 return false; 80 if( size+write.location.getSize() >= MAX_WRITE_BATCH_SIZE ) 81 return false; 82 return true; 83 } 84 85 public void append(WriteCommand write) throws IOException { 86 this.first.getTailNode().linkAfter(write); 87 size+=write.location.getSize(); 88 } 89 } 90 91 public static class WriteCommand extends LinkedNode { 92 public final Location location; 93 public final ByteSequence data; 94 final boolean sync; 95 96 public WriteCommand(Location location, ByteSequence data, boolean sync) { 97 this.location = location; 98 this.data = data; 99 this.sync = sync; 100 } 101 } 102 103 protected final AsyncDataManager dataManager; 104 105 protected final ConcurrentHashMap <WriteKey, WriteCommand> inflightWrites; 106 107 protected final Object enqueueMutex = new Object (); 108 protected WriteBatch nextWriteBatch; 109 110 private boolean running; 111 protected boolean shutdown; 112 protected IOException firstAsyncException; 113 protected final CountDownLatch shutdownDone = new CountDownLatch (1); 114 private Thread thread; 115 116 121 public DataFileAppender(AsyncDataManager dataManager){ 122 this.dataManager=dataManager; 123 this.inflightWrites = this.dataManager.getInflightWrites(); 124 } 125 126 137 public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { 138 139 int size = data.getLength()+AsyncDataManager.ITEM_HEAD_FOOT_SPACE; 141 142 final Location location=new Location(); 143 location.setSize(size); 144 location.setType(type); 145 146 WriteBatch batch; 147 WriteCommand write = new WriteCommand(location, data, sync); 148 149 153 synchronized(this) { 154 DataFile dataFile=dataManager.allocateLocation(location); 156 batch = enqueue(dataFile, write); 157 } 158 location.setLatch(batch.latch); 159 if( sync ) { 160 try { 161 batch.latch.await(); 162 } catch (InterruptedException e) { 163 throw new InterruptedIOException (); 164 } 165 } else { 166 inflightWrites.put(new WriteKey(location), write); 167 } 168 169 return location; 170 } 171 172 private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException { 173 synchronized(enqueueMutex) { 174 WriteBatch rc=null; 175 if( shutdown ) { 176 throw new IOException ("Async Writter Thread Shutdown"); 177 } 178 if( firstAsyncException !=null ) 179 throw firstAsyncException; 180 181 if( !running ) { 182 running=true; 183 thread = new Thread () { 184 public void run() { 185 processQueue(); 186 } 187 }; 188 thread.setPriority(Thread.MAX_PRIORITY); 189 thread.setDaemon(true); 190 thread.setName("ActiveMQ Data File Writer"); 191 thread.start(); 192 } 193 194 if( nextWriteBatch == null ) { 195 nextWriteBatch = new WriteBatch(dataFile,write); 196 rc = nextWriteBatch; 197 enqueueMutex.notify(); 198 } else { 199 if( nextWriteBatch.canAppend(dataFile, write) ) { 201 nextWriteBatch.append(write); 202 rc = nextWriteBatch; 203 } else { 204 try { 206 while( nextWriteBatch!=null ) { 207 enqueueMutex.wait(); 208 } 209 } catch (InterruptedException e) { 210 throw new InterruptedIOException (); 211 } 212 if( shutdown ) { 213 throw new IOException ("Async Writter Thread Shutdown"); 214 } 215 216 nextWriteBatch = new WriteBatch(dataFile,write); 218 rc = nextWriteBatch; 219 enqueueMutex.notify(); 220 } 221 } 222 return rc; 223 } 224 } 225 226 public void close() throws IOException { 227 synchronized( enqueueMutex ) { 228 if( shutdown == false ) { 229 shutdown = true; 230 if( running ) { 231 enqueueMutex.notifyAll(); 232 } else { 233 shutdownDone.countDown(); 234 } 235 } 236 } 237 238 try { 239 shutdownDone.await(); 240 } catch (InterruptedException e) { 241 throw new InterruptedIOException (); 242 } 243 244 } 245 246 256 protected void processQueue() { 257 DataFile dataFile=null; 258 RandomAccessFile file=null; 259 try { 260 261 DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE); 262 while( true ) { 263 264 Object o = null; 265 266 synchronized(enqueueMutex) { 268 while( true ) { 269 if( shutdown ) { 270 o = SHUTDOWN_COMMAND; 271 break; 272 } 273 if( nextWriteBatch!=null ) { 274 o = nextWriteBatch; 275 nextWriteBatch=null; 276 break; 277 } 278 enqueueMutex.wait(); 279 } 280 enqueueMutex.notify(); 281 } 282 283 284 if( o == SHUTDOWN_COMMAND ) { 285 break; 286 } 287 288 WriteBatch wb = (WriteBatch) o; 289 if( dataFile != wb.dataFile ) { 290 if( file!=null ) { 291 dataFile.closeRandomAccessFile(file); 292 } 293 dataFile = wb.dataFile; 294 file = dataFile.openRandomAccessFile(true); 295 } 296 297 WriteCommand write = wb.first; 298 299 file.seek(write.location.getOffset()); 303 304 if( wb.size == write.location.getSize() ) { 307 308 file.writeInt(write.location.getSize()); 310 file.writeByte(write.location.getType()); 311 file.write(RESERVED_SPACE); 312 file.write(AsyncDataManager.ITEM_HEAD_SOR); 313 file.write(write.data.getData(),write.data.getOffset(), write.data.getLength()); 314 file.write(AsyncDataManager.ITEM_HEAD_EOR); 315 316 } else { 317 318 while( write!=null ) { 320 321 buff.writeInt(write.location.getSize()); 322 buff.writeByte(write.location.getType()); 323 buff.write(RESERVED_SPACE); 324 buff.write(AsyncDataManager.ITEM_HEAD_SOR); 325 buff.write(write.data.getData(),write.data.getOffset(), write.data.getLength()); 326 buff.write(AsyncDataManager.ITEM_HEAD_EOR); 327 328 write = (WriteCommand) write.getNext(); 329 } 330 331 ByteSequence sequence = buff.toByteSequence(); 333 file.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 334 buff.reset(); 335 } 336 337 file.getFD().sync(); 338 339 WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode(); 340 dataManager.setLastAppendLocation( lastWrite.location ); 341 342 wb.latch.countDown(); 344 345 write = wb.first; 348 while( write!=null ) { 349 if( !write.sync ) { 350 inflightWrites.remove(new WriteKey(write.location)); 351 } 352 write = (WriteCommand) write.getNext(); 353 } 354 } 355 356 } catch (IOException e) { 357 synchronized( enqueueMutex ) { 358 firstAsyncException = e; 359 } 360 } catch (InterruptedException e) { 361 } finally { 362 try { 363 if( file!=null ) { 364 dataFile.closeRandomAccessFile(file); 365 } 366 } catch (IOException e) { 367 } 368 shutdownDone.countDown(); 369 } 370 } 371 372 } 373 | Popular Tags |