1 18 package org.apache.activemq.kaha.impl.async; 19 20 import java.io.ByteArrayInputStream ; 21 import java.io.ByteArrayOutputStream ; 22 import java.io.DataInputStream ; 23 import java.io.DataOutputStream ; 24 import java.io.File ; 25 import java.io.FilenameFilter ; 26 import java.io.IOException ; 27 import java.util.ArrayList ; 28 import java.util.Collections ; 29 import java.util.HashMap ; 30 import java.util.HashSet ; 31 import java.util.Iterator ; 32 import java.util.List ; 33 import java.util.Map ; 34 import java.util.Set ; 35 import java.util.concurrent.ConcurrentHashMap ; 36 import java.util.concurrent.atomic.AtomicReference ; 37 38 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; 39 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; 40 import org.apache.activemq.thread.Scheduler; 41 import org.apache.activemq.util.ByteSequence; 42 import org.apache.commons.logging.Log; 43 import org.apache.commons.logging.LogFactory; 44 45 50 public final class AsyncDataManager { 51 52 private static final Log log=LogFactory.getLog(AsyncDataManager.class); 53 54 public static int CONTROL_RECORD_MAX_LENGTH=1024; 55 56 public static final int ITEM_HEAD_RESERVED_SPACE=21; 57 public static final int ITEM_HEAD_SPACE=4+1+ITEM_HEAD_RESERVED_SPACE+3; 59 public static final int ITEM_HEAD_OFFSET_TO_SOR=ITEM_HEAD_SPACE-3; 60 public static final int ITEM_FOOT_SPACE=3; 62 public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE; 63 64 public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; 67 public static final byte DATA_ITEM_TYPE=1; 68 public static final byte REDO_ITEM_TYPE=2; 69 70 public static String DEFAULT_DIRECTORY="data"; 71 public static String DEFAULT_FILE_PREFIX="data-"; 72 public static int DEFAULT_MAX_FILE_LENGTH=1024*1024*32; 73 74 private File directory = new File (DEFAULT_DIRECTORY); 75 private String filePrefix=DEFAULT_FILE_PREFIX; 76 private int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 77 private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512; 78 79 private DataFileAppender appender; 80 private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this); 81 82 private Map <Integer ,DataFile> fileMap=new HashMap <Integer ,DataFile>(); 83 private DataFile currentWriteFile; 84 ControlFile controlFile; 85 86 private Location mark; 87 private final AtomicReference <Location> lastAppendLocation = new AtomicReference <Location>(); 88 boolean started = false; 89 boolean useNio = true; 90 91 protected final ConcurrentHashMap <WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap <WriteKey, WriteCommand>(); 92 93 private Runnable cleanupTask; 94 95 @SuppressWarnings ("unchecked") 96 public synchronized void start() throws IOException { 97 if( started ) { 98 return; 99 } 100 101 102 started=true; 103 directory.mkdirs(); 104 controlFile = new ControlFile(new File (directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH); 105 controlFile.lock(); 106 107 ByteSequence sequence = controlFile.load(); 108 if( sequence != null && sequence.getLength()>0 ) { 109 unmarshallState(sequence); 110 } 111 if( useNio) { 112 appender = new NIODataFileAppender(this); 113 } else { 114 appender = new DataFileAppender(this); 115 } 116 117 File [] files=directory.listFiles(new FilenameFilter (){ 118 public boolean accept(File dir,String n){ 119 return dir.equals(dir)&&n.startsWith(filePrefix); 120 } 121 }); 122 123 if(files!=null){ 124 for(int i=0;i<files.length;i++){ 125 try { 126 File file=files[i]; 127 String n=file.getName(); 128 String numStr=n.substring(filePrefix.length(),n.length()); 129 int num=Integer.parseInt(numStr); 130 DataFile dataFile=new DataFile(file,num, preferedFileLength); 131 fileMap.put(dataFile.getDataFileId(),dataFile); 132 } catch (NumberFormatException e) { 133 } 135 } 136 137 ArrayList <DataFile> l = new ArrayList <DataFile>(fileMap.values()); 139 Collections.sort(l); 140 currentWriteFile=null; 141 for (DataFile df : l) { 142 if( currentWriteFile!=null ) { 143 currentWriteFile.linkAfter(df); 144 } 145 currentWriteFile=df; 146 } 147 } 148 149 if( currentWriteFile!=null ) { 151 152 Location l = lastAppendLocation.get(); 154 if( l!=null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue() ) { 155 l=null; 156 } 157 158 l = recoveryCheck(currentWriteFile, l); 160 lastAppendLocation.set(l); 161 } 162 163 storeState(false); 164 165 cleanupTask = new Runnable (){ 166 public void run() { 167 cleanup(); 168 }}; 169 Scheduler.executePeriodically(cleanupTask, 1000*30); 170 } 171 172 private Location recoveryCheck(DataFile dataFile, Location location) throws IOException { 173 if( location == null ) { 174 location = new Location(); 175 location.setDataFileId(dataFile.getDataFileId()); 176 location.setOffset(0); 177 } 178 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 179 try { 180 reader.readLocationDetails(location); 181 while( reader.readLocationDetailsAndValidate(location) ) { 182 location.setOffset(location.getOffset()+location.getSize()); 183 } 184 } finally { 185 accessorPool.closeDataFileAccessor(reader); 186 } 187 dataFile.setLength(location.getOffset()); 188 return location; 189 } 190 191 private void unmarshallState(ByteSequence sequence) throws IOException { 192 ByteArrayInputStream bais = new ByteArrayInputStream (sequence.getData(), sequence.getOffset(), sequence.getLength()); 193 DataInputStream dis = new DataInputStream (bais); 194 if( dis.readBoolean() ) { 195 mark = new Location(); 196 mark.readExternal(dis); 197 } else { 198 mark = null; 199 } 200 if( dis.readBoolean() ) { 201 Location l = new Location(); 202 l.readExternal(dis); 203 lastAppendLocation.set(l); 204 } else { 205 lastAppendLocation.set(null); 206 } 207 } 208 209 private synchronized ByteSequence marshallState() throws IOException { 210 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 211 DataOutputStream dos = new DataOutputStream (baos); 212 213 if( mark!=null ) { 214 dos.writeBoolean(true); 215 mark.writeExternal(dos); 216 } else { 217 dos.writeBoolean(false); 218 } 219 Location l = lastAppendLocation.get(); 220 if( l!=null ) { 221 dos.writeBoolean(true); 222 l.writeExternal(dos); 223 } else { 224 dos.writeBoolean(false); 225 } 226 227 byte[] bs = baos.toByteArray(); 228 return new ByteSequence(bs,0,bs.length); 229 } 230 231 synchronized DataFile allocateLocation(Location location) throws IOException { 232 if(currentWriteFile==null||((currentWriteFile.getLength()+location.getSize())>maxFileLength)){ 233 int nextNum=currentWriteFile!=null?currentWriteFile.getDataFileId().intValue()+1:1; 234 235 String fileName=filePrefix+nextNum; 236 DataFile nextWriteFile=new DataFile(new File (directory,fileName),nextNum, preferedFileLength); 237 fileMap.put(nextWriteFile.getDataFileId(),nextWriteFile); 238 if( currentWriteFile!=null ) { 239 currentWriteFile.linkAfter(nextWriteFile); 240 if(currentWriteFile.isUnused()){ 241 removeDataFile(currentWriteFile); 242 } 243 } 244 currentWriteFile=nextWriteFile; 245 246 } 247 location.setOffset(currentWriteFile.getLength()); 248 location.setDataFileId(currentWriteFile.getDataFileId().intValue()); 249 currentWriteFile.incrementLength(location.getSize()); 250 currentWriteFile.increment(); 251 return currentWriteFile; 252 } 253 254 DataFile getDataFile(Location item) throws IOException { 255 Integer key=new Integer (item.getDataFileId()); 256 DataFile dataFile=(DataFile) fileMap.get(key); 257 if(dataFile==null){ 258 log.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 259 throw new IOException ("Could not locate data file "+filePrefix+"-"+item.getDataFileId()); 260 } 261 return dataFile; 262 } 263 264 private DataFile getNextDataFile(DataFile dataFile) { 265 return (DataFile) dataFile.getNext(); 266 } 267 268 public void close() throws IOException { 269 synchronized(this){ 270 if(!started){ 271 return; 272 } 273 Scheduler.cancel(cleanupTask); 274 accessorPool.close(); 275 } 276 storeState(false); 277 appender.close(); 278 fileMap.clear(); 279 controlFile.unlock(); 280 controlFile.dispose(); 281 started=false; 282 } 283 284 private synchronized void cleanup() { 285 if( accessorPool!=null ) { 286 accessorPool.disposeUnused(); 287 } 288 } 289 public synchronized boolean delete() throws IOException { 290 291 appender.close(); 293 accessorPool.close(); 294 295 boolean result=true; 296 for(Iterator i=fileMap.values().iterator();i.hasNext();){ 297 DataFile dataFile=(DataFile) i.next(); 298 result&=dataFile.delete(); 299 } 300 fileMap.clear(); 301 lastAppendLocation.set(null); 302 mark=null; 303 currentWriteFile=null; 304 305 accessorPool = new DataFileAccessorPool(this); 307 if( useNio) { 308 appender = new NIODataFileAppender(this); 309 } else { 310 appender = new DataFileAppender(this); 311 } 312 return result; 313 } 314 315 public synchronized void addInterestInFile(int file) throws IOException { 316 if(file>=0){ 317 Integer key=new Integer (file); 318 DataFile dataFile=(DataFile) fileMap.get(key); 319 if(dataFile==null){ 320 throw new IOException ("That data file does not exist"); 321 } 322 addInterestInFile(dataFile); 323 } 324 } 325 326 synchronized void addInterestInFile(DataFile dataFile){ 327 if(dataFile!=null){ 328 dataFile.increment(); 329 } 330 } 331 332 public synchronized void removeInterestInFile(int file) throws IOException { 333 if(file>=0){ 334 Integer key=new Integer (file); 335 DataFile dataFile=(DataFile) fileMap.get(key); 336 removeInterestInFile(dataFile); 337 } 338 } 339 340 synchronized void removeInterestInFile(DataFile dataFile) throws IOException { 341 if(dataFile!=null){ 342 if(dataFile.decrement()<=0){ 343 removeDataFile(dataFile); 344 } 345 } 346 } 347 348 349 synchronized public void consolidateDataFilesNotIn(Set <Integer > inUse) throws IOException { 350 351 Set <Integer > unUsed = new HashSet <Integer >(fileMap.keySet()); 353 unUsed.removeAll(inUse); 354 355 List <DataFile> purgeList=new ArrayList <DataFile>(); 356 for (Integer key : unUsed) { 357 DataFile dataFile=(DataFile) fileMap.get(key); 358 purgeList.add(dataFile); 359 } 360 361 for (DataFile dataFile : purgeList) { 362 removeDataFile(dataFile); 363 } 364 } 365 366 public synchronized void consolidateDataFiles() throws IOException { 367 List <DataFile> purgeList=new ArrayList <DataFile>(); 368 for (DataFile dataFile : fileMap.values()) { 369 if( dataFile.isUnused() ){ 370 purgeList.add(dataFile); 371 } 372 } 373 for (DataFile dataFile : purgeList) { 374 removeDataFile(dataFile); 375 } 376 } 377 378 private void removeDataFile(DataFile dataFile) throws IOException { 379 380 if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) { 382 return; 383 } 384 385 accessorPool.disposeDataFileAccessors(dataFile); 386 387 fileMap.remove(dataFile.getDataFileId()); 388 dataFile.unlink(); 389 boolean result=dataFile.delete(); 390 log.debug("discarding data file "+dataFile+(result?"successful ":"failed")); 391 392 } 393 394 395 398 public int getMaxFileLength(){ 399 return maxFileLength; 400 } 401 402 405 public void setMaxFileLength(int maxFileLength){ 406 this.maxFileLength=maxFileLength; 407 } 408 409 public String toString(){ 410 return "DataManager:("+filePrefix+")"; 411 } 412 413 public synchronized Location getMark() throws IllegalStateException { 414 return mark; 415 } 416 417 public Location getNextLocation(Location location) throws IOException , IllegalStateException { 418 419 420 Location cur = null; 421 while( true ) { 422 if( cur == null ) { 423 if( location == null ) { 424 DataFile head = (DataFile) currentWriteFile.getHeadNode(); 425 cur = new Location(); 426 cur.setDataFileId(head.getDataFileId()); 427 cur.setOffset(0); 428 429 } else { 438 cur = new Location(location); 440 cur.setOffset(cur.getOffset()+cur.getSize()); 441 } 442 } else { 443 cur.setOffset(cur.getOffset()+cur.getSize()); 444 } 445 446 DataFile dataFile = getDataFile(cur); 447 448 if( dataFile.getLength() <= cur.getOffset() ) { 450 dataFile = getNextDataFile(dataFile); 451 if( dataFile == null ) { 452 return null; 453 } else { 454 cur.setDataFileId(dataFile.getDataFileId().intValue()); 455 cur.setOffset(0); 456 } 457 } 458 459 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 461 try { 462 reader.readLocationDetails(cur); 463 } finally { 464 accessorPool.closeDataFileAccessor(reader); 465 } 466 467 if( cur.getType() == 0 ) { 468 return null; 469 } else if( cur.getType() > 0 ) { 470 return cur; 472 } 473 } 474 } 475 476 public ByteSequence read(Location location) throws IOException , IllegalStateException { 477 DataFile dataFile = getDataFile(location); 478 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 479 ByteSequence rc=null; 480 try { 481 rc = reader.readRecord(location); 482 } finally { 483 accessorPool.closeDataFileAccessor(reader); 484 } 485 return rc; 486 } 487 488 public void setMark(Location location, boolean sync) throws IOException , IllegalStateException { 489 synchronized(this) { 490 mark = location; 491 } 492 storeState(sync); 493 } 494 495 private void storeState(boolean sync) throws IOException { 496 ByteSequence state = marshallState(); 497 appender.storeItem(state, Location.MARK_TYPE, sync); 498 controlFile.store(state, sync); 499 } 500 501 public Location write(ByteSequence data, boolean sync) throws IOException , IllegalStateException { 502 return appender.storeItem(data, Location.USER_TYPE, sync); 503 } 504 505 public Location write(ByteSequence data, byte type, boolean sync) throws IOException , IllegalStateException { 506 return appender.storeItem(data, type, sync); 507 } 508 509 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 510 DataFile dataFile = getDataFile(location); 511 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 512 try { 513 updater.updateRecord(location, data, sync); 514 } finally { 515 accessorPool.closeDataFileAccessor(updater); 516 } 517 } 518 519 public File getDirectory() { 520 return directory; 521 } 522 523 public void setDirectory(File directory) { 524 this.directory = directory; 525 } 526 527 public String getFilePrefix() { 528 return filePrefix; 529 } 530 531 public void setFilePrefix(String filePrefix) { 532 this.filePrefix = filePrefix; 533 } 534 535 public ConcurrentHashMap <WriteKey, WriteCommand> getInflightWrites() { 536 return inflightWrites; 537 } 538 539 public Location getLastAppendLocation() { 540 return lastAppendLocation.get(); 541 } 542 543 public void setLastAppendLocation(Location lastSyncedLocation) { 544 this.lastAppendLocation.set(lastSyncedLocation); 545 } 546 547 } 548 | Popular Tags |