1 14 15 package org.apache.activemq.kaha.impl.index.hash; 16 17 import java.io.File ; 18 import java.io.IOException ; 19 import java.io.RandomAccessFile ; 20 import java.util.concurrent.atomic.AtomicBoolean ; 21 import org.apache.activemq.kaha.Marshaller; 22 import org.apache.activemq.kaha.StoreEntry; 23 import org.apache.activemq.kaha.impl.index.Index; 24 import org.apache.activemq.kaha.impl.index.IndexManager; 25 import org.apache.activemq.util.DataByteArrayInputStream; 26 import org.apache.activemq.util.DataByteArrayOutputStream; 27 import org.apache.activemq.util.LRUCache; 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 31 36 public class HashIndex implements Index{ 37 38 private static final String NAME_PREFIX="hash-index-"; 39 private static final int DEFAULT_PAGE_SIZE; 40 private static final int DEFAULT_KEY_SIZE; 41 private static final Log log=LogFactory.getLog(HashIndex.class); 42 private final String name; 43 private File directory; 44 private File file; 45 private RandomAccessFile indexFile; 46 private IndexManager indexManager; 47 private int pageSize=DEFAULT_PAGE_SIZE; 48 private int keySize=DEFAULT_KEY_SIZE; 49 private int keysPerPage=pageSize/keySize; 50 private DataByteArrayInputStream dataIn; 51 private DataByteArrayOutputStream dataOut; 52 private byte[] readBuffer; 53 private HashBin[] bins; 54 private Marshaller keyMarshaller; 55 private long length=0; 56 private HashPage firstFree; 57 private HashPage lastFree; 58 private AtomicBoolean loaded=new AtomicBoolean (); 59 private LRUCache<Long ,HashPage> pageCache; 60 private boolean enablePageCaching=false; 61 private int pageCacheSize=10; 62 63 71 public HashIndex(File directory,String name,IndexManager indexManager) throws IOException { 72 this(directory,name,indexManager,1024); 73 } 74 75 84 public HashIndex(File directory,String name,IndexManager indexManager,int numberOfBins) throws IOException { 85 this.directory=directory; 86 this.name=name; 87 this.indexManager=indexManager; 88 int capacity=1; 89 while(capacity<numberOfBins) 90 capacity<<=1; 91 this.bins=new HashBin[capacity]; 92 openIndexFile(); 93 pageCache=new LRUCache<Long ,HashPage>(pageCacheSize,pageCacheSize,0.75f,true); 94 } 95 96 101 public synchronized void setKeyMarshaller(Marshaller marshaller){ 102 this.keyMarshaller=marshaller; 103 } 104 105 108 public synchronized int getKeySize(){ 109 return this.keySize; 110 } 111 112 115 public synchronized void setKeySize(int keySize){ 116 this.keySize=keySize; 117 if(loaded.get()){ 118 throw new RuntimeException ("Pages already loaded - can't reset key size"); 119 } 120 } 121 122 125 public synchronized int getPageSize(){ 126 return this.pageSize; 127 } 128 129 132 public synchronized void setPageSize(int pageSize){ 133 if(loaded.get()&&pageSize!=this.pageSize){ 134 throw new RuntimeException ("Pages already loaded - can't reset page size"); 135 } 136 this.pageSize=pageSize; 137 } 138 139 140 143 public synchronized boolean isEnablePageCaching(){ 144 return this.enablePageCaching; 145 } 146 147 150 public synchronized void setEnablePageCaching(boolean enablePageCaching){ 151 this.enablePageCaching=enablePageCaching; 152 } 153 154 157 public synchronized int getPageCacheSize(){ 158 return this.pageCacheSize; 159 } 160 161 164 public synchronized void setPageCacheSize(int pageCacheSize){ 165 this.pageCacheSize=pageCacheSize; 166 pageCache.setMaxCacheSize(pageCacheSize); 167 } 168 169 170 public synchronized boolean isTransient(){ 171 return false; 172 } 173 174 public synchronized void load(){ 175 if(loaded.compareAndSet(false,true)){ 176 keysPerPage=pageSize/keySize; 177 dataIn=new DataByteArrayInputStream(); 178 dataOut=new DataByteArrayOutputStream(pageSize); 179 readBuffer=new byte[pageSize]; 180 try{ 181 openIndexFile(); 182 long offset=0; 183 while((offset+pageSize)<=indexFile.length()){ 184 indexFile.seek(offset); 185 indexFile.readFully(readBuffer,0,HashPage.PAGE_HEADER_SIZE); 186 dataIn.restart(readBuffer); 187 HashPage page=new HashPage(keysPerPage); 188 page.setId(offset); 189 page.readHeader(dataIn); 190 if(!page.isActive()){ 191 if(lastFree!=null){ 192 lastFree.setNextFreePageId(offset); 193 indexFile.seek(lastFree.getId()); 194 dataOut.reset(); 195 lastFree.writeHeader(dataOut); 196 indexFile.write(dataOut.getData(),0,HashPage.PAGE_HEADER_SIZE); 197 lastFree=page; 198 }else{ 199 lastFree=firstFree=page; 200 } 201 }else{ 202 addToBin(page); 203 } 204 offset+=pageSize; 205 } 206 length=offset; 207 }catch(IOException e){ 208 log.error("Failed to load index ",e); 209 throw new RuntimeException (e); 210 } 211 } 212 } 213 214 public synchronized void unload() throws IOException { 215 if(loaded.compareAndSet(true,false)){ 216 if(indexFile!=null){ 217 indexFile.close(); 218 indexFile=null; 219 firstFree=lastFree=null; 220 bins=new HashBin[bins.length]; 221 } 222 } 223 } 224 225 public synchronized void store(Object key,StoreEntry value) throws IOException { 226 load(); 227 HashEntry entry=new HashEntry(); 228 entry.setKey((Comparable )key); 229 entry.setIndexOffset(value.getOffset()); 230 getBin(key).put(entry); 231 } 232 233 public synchronized StoreEntry get(Object key) throws IOException { 234 load(); 235 HashEntry entry=new HashEntry(); 236 entry.setKey((Comparable )key); 237 HashEntry result=getBin(key).find(entry); 238 return result!=null?indexManager.getIndex(result.getIndexOffset()):null; 239 } 240 241 public synchronized StoreEntry remove(Object key) throws IOException { 242 load(); 243 HashEntry entry=new HashEntry(); 244 entry.setKey((Comparable )key); 245 HashEntry result = getBin(key).remove(entry); 246 return result!=null?indexManager.getIndex(result.getIndexOffset()):null; 247 } 248 249 public synchronized boolean containsKey(Object key) throws IOException { 250 return get(key)!=null; 251 } 252 253 public synchronized void clear() throws IOException { 254 unload(); 255 delete(); 256 openIndexFile(); 257 load(); 258 } 259 260 public synchronized void delete() throws IOException { 261 unload(); 262 if(file.exists()){ 263 boolean result=file.delete(); 264 } 265 length=0; 266 } 267 268 HashPage lookupPage(long pageId) throws IOException { 269 HashPage result=null; 270 if(pageId>=0){ 271 result=getFromCache(pageId); 272 if(result==null){ 273 result=getFullPage(pageId); 274 if(result!=null){ 275 if(result.isActive()){ 276 addToCache(result); 277 }else{ 278 throw new IllegalStateException ("Trying to access an inactive page: "+pageId); 279 } 280 } 281 } 282 } 283 return result; 284 } 285 286 HashPage createPage(int binId) throws IOException { 287 HashPage result=getNextFreePage(); 288 if(result==null){ 289 result=new HashPage(keysPerPage); 291 result.setId(length); 292 result.setBinId(binId); 293 writePageHeader(result); 294 length+=pageSize; 295 indexFile.seek(length); 296 indexFile.write(HashEntry.NOT_SET); 297 } 298 addToCache(result); 299 return result; 300 } 301 302 void releasePage(HashPage page) throws IOException { 303 removeFromCache(page); 304 page.reset(); 305 page.setActive(false); 306 if(lastFree==null){ 307 firstFree=lastFree=page; 308 }else{ 309 lastFree.setNextFreePageId(page.getId()); 310 writePageHeader(lastFree); 311 } 312 writePageHeader(page); 313 } 314 315 private HashPage getNextFreePage() throws IOException { 316 HashPage result=null; 317 if(firstFree!=null){ 318 if(firstFree.equals(lastFree)){ 319 result=firstFree; 320 firstFree=lastFree=null; 321 }else{ 322 result=firstFree; 323 firstFree=getPageHeader(firstFree.getNextFreePageId()); 324 if(firstFree==null){ 325 lastFree=null; 326 } 327 } 328 result.setActive(true); 329 result.reset(); 330 writePageHeader(result); 331 } 332 return result; 333 } 334 335 void writeFullPage(HashPage page) throws IOException { 336 dataOut.reset(); 337 page.write(keyMarshaller,dataOut); 338 if(dataOut.size()>pageSize){ 339 throw new IOException ("Page Size overflow: pageSize is "+pageSize+" trying to write "+dataOut.size()); 340 } 341 indexFile.seek(page.getId()); 342 indexFile.write(dataOut.getData(),0,dataOut.size()); 343 } 344 345 void writePageHeader(HashPage page) throws IOException { 346 dataOut.reset(); 347 page.writeHeader(dataOut); 348 indexFile.seek(page.getId()); 349 indexFile.write(dataOut.getData(),0,HashPage.PAGE_HEADER_SIZE); 350 } 351 352 HashPage getFullPage(long id) throws IOException { 353 indexFile.seek(id); 354 indexFile.readFully(readBuffer,0,pageSize); 355 dataIn.restart(readBuffer); 356 HashPage page=new HashPage(keysPerPage); 357 page.setId(id); 358 page.read(keyMarshaller,dataIn); 359 return page; 360 } 361 362 HashPage getPageHeader(long id) throws IOException { 363 indexFile.seek(id); 364 indexFile.readFully(readBuffer,0,HashPage.PAGE_HEADER_SIZE); 365 dataIn.restart(readBuffer); 366 HashPage page=new HashPage(keysPerPage); 367 page.setId(id); 368 page.readHeader(dataIn); 369 return page; 370 } 371 372 void addToBin(HashPage page){ 373 HashBin bin=getBin(page.getBinId()); 374 bin.addHashPageInfo(page.getId(),page.getPersistedSize()); 375 } 376 377 private HashBin getBin(int index){ 378 HashBin result=bins[index]; 379 if(result==null){ 380 result=new HashBin(this,index,pageSize/keySize); 381 bins[index]=result; 382 } 383 return result; 384 } 385 386 private void openIndexFile() throws IOException { 387 if(indexFile==null){ 388 file=new File (directory,NAME_PREFIX+name); 389 indexFile=new RandomAccessFile (file,"rw"); 390 } 391 } 392 393 private HashBin getBin(Object key){ 394 int hash=hash(key); 395 int i=indexFor(hash,bins.length); 396 return getBin(i); 397 } 398 399 private HashPage getFromCache(long pageId){ 400 HashPage result=null; 401 if(enablePageCaching){ 402 result=pageCache.get(pageId); 403 } 404 return result; 405 } 406 407 private void addToCache(HashPage page){ 408 if(enablePageCaching){ 409 pageCache.put(page.getId(),page); 410 } 411 } 412 413 private void removeFromCache(HashPage page){ 414 if(enablePageCaching){ 415 pageCache.remove(page.getId()); 416 } 417 } 418 419 static int hash(Object x){ 420 int h=x.hashCode(); 421 h+=~(h<<9); 422 h^=(h>>>14); 423 h+=(h<<4); 424 h^=(h>>>10); 425 return h; 426 } 427 428 static int indexFor(int h,int length){ 429 return h&(length-1); 430 } 431 static{ 432 DEFAULT_PAGE_SIZE=Integer.parseInt(System.getProperty("defaultPageSize","16384")); 433 DEFAULT_KEY_SIZE=Integer.parseInt(System.getProperty("defaultKeySize","96")); 434 } 435 } 436 | Popular Tags |