1 18 19 package org.apache.activemq.kaha.impl; 20 21 import java.io.File ; 22 import java.io.IOException ; 23 import java.nio.channels.FileLock ; 24 import java.util.HashSet ; 25 import java.util.Iterator ; 26 import java.util.Map ; 27 import java.util.Properties ; 28 import java.util.Set ; 29 import java.util.concurrent.ConcurrentHashMap ; 30 import org.apache.activemq.kaha.ContainerId; 31 import org.apache.activemq.kaha.ListContainer; 32 import org.apache.activemq.kaha.MapContainer; 33 import org.apache.activemq.kaha.RuntimeStoreException; 34 import org.apache.activemq.kaha.Store; 35 import org.apache.activemq.kaha.StoreLocation; 36 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 37 import org.apache.activemq.kaha.impl.async.DataManagerFacade; 38 import org.apache.activemq.kaha.impl.container.ListContainerImpl; 39 import org.apache.activemq.kaha.impl.container.MapContainerImpl; 40 import org.apache.activemq.kaha.impl.data.DataManagerImpl; 41 import org.apache.activemq.kaha.impl.data.Item; 42 import org.apache.activemq.kaha.impl.data.RedoListener; 43 import org.apache.activemq.kaha.impl.index.IndexItem; 44 import org.apache.activemq.kaha.impl.index.IndexManager; 45 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem; 46 import org.apache.commons.logging.Log; 47 import org.apache.commons.logging.LogFactory; 48 49 54 public class KahaStore implements Store{ 55 56 private static final String LOCK_FILE_NAME="store.lock"; 57 58 private final static String PROPERTY_PREFIX="org.apache.activemq.kaha.Store"; 59 private final static boolean brokenFileLock="true".equals(System.getProperty(PROPERTY_PREFIX+".broken","false")); 60 private final static boolean disableLocking="true".equals(System.getProperty(PROPERTY_PREFIX+"DisableLocking", 61 "false")); 62 private static Set <String > lockSet; 63 private static final Log log=LogFactory.getLog(KahaStore.class); 64 private File directory; 65 private IndexRootContainer mapsContainer; 66 private IndexRootContainer listsContainer; 67 private Map <ContainerId, ListContainerImpl> lists=new ConcurrentHashMap <ContainerId, ListContainerImpl>(); 68 private Map <ContainerId, MapContainerImpl> maps=new ConcurrentHashMap <ContainerId, MapContainerImpl>(); 69 private Map <String , DataManager> dataManagers=new ConcurrentHashMap <String , DataManager>(); 70 private Map <String , IndexManager> indexManagers=new ConcurrentHashMap <String , IndexManager>(); 71 private IndexManager rootIndexManager; private boolean closed=false; 73 private String mode; 74 private boolean initialized; 75 private boolean logIndexChanges=false; 76 private boolean useAsyncDataManager=false; 77 private long maxDataFileLength=1024*1024*32; 78 private FileLock lock; 79 private boolean persistentIndex=true; 80 81 public KahaStore(String name,String mode) throws IOException { 82 this.mode=mode; 83 directory=new File (name); 84 directory.mkdirs(); 85 } 86 87 public synchronized void close() throws IOException { 88 if(!closed){ 89 closed=true; 90 if(initialized){ 91 unlock(); 92 93 for (ListContainerImpl container : lists.values()) { 94 container.close(); 95 } 96 lists.clear(); 97 for (MapContainerImpl container : maps.values()) { 98 container.close(); 99 } 100 maps.clear(); 101 for(Iterator <IndexManager> iter=indexManagers.values().iterator();iter.hasNext();){ 102 IndexManager im=iter.next(); 103 im.close(); 104 iter.remove(); 105 } 106 for(Iterator <DataManager> iter=dataManagers.values().iterator();iter.hasNext();){ 107 DataManager dm=iter.next(); 108 dm.close(); 109 iter.remove(); 110 } 111 } 112 } 113 } 114 115 public synchronized void force() throws IOException { 116 if(initialized){ 117 for(Iterator <IndexManager> iter=indexManagers.values().iterator();iter.hasNext();){ 118 IndexManager im=iter.next(); 119 im.force(); 120 } 121 for(Iterator <DataManager> iter=dataManagers.values().iterator();iter.hasNext();){ 122 DataManager dm=iter.next(); 123 dm.force(); 124 } 125 } 126 } 127 128 public synchronized void clear() throws IOException { 129 initialize(); 130 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { 131 ContainerId id = (ContainerId)i.next(); 132 MapContainer container = getMapContainer(id.getKey(),id.getDataContainerName()); 133 container.clear(); 134 } 135 for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { 136 ContainerId id = (ContainerId)i.next(); 137 ListContainer container = getListContainer(id.getKey(),id.getDataContainerName()); 138 container.clear(); 139 } 140 141 } 142 143 public synchronized boolean delete() throws IOException { 144 boolean result=true; 145 if(initialized){ 146 clear(); 147 for(Iterator <IndexManager> iter=indexManagers.values().iterator();iter.hasNext();){ 148 IndexManager im=iter.next(); 149 result&=im.delete(); 150 iter.remove(); 151 } 152 for(Iterator <DataManager> iter=dataManagers.values().iterator();iter.hasNext();){ 153 DataManager dm=iter.next(); 154 result&=dm.delete(); 155 iter.remove(); 156 } 157 } 158 if(directory!=null&&directory.isDirectory()){ 159 File [] files=directory.listFiles(); 160 if(files!=null){ 161 for(int i=0;i<files.length;i++){ 162 File file=files[i]; 163 if(!file.isDirectory()){ 164 result&=file.delete(); 165 } 166 } 167 } 168 String str=result?"successfully deleted":"failed to delete"; 169 log.info("Kaha Store "+str+" data directory "+directory); 170 } 171 return result; 172 } 173 174 public synchronized boolean isInitialized(){ 175 return initialized; 176 } 177 178 public boolean doesMapContainerExist(Object id) throws IOException { 179 return doesMapContainerExist(id,DEFAULT_CONTAINER_NAME); 180 } 181 182 public synchronized boolean doesMapContainerExist(Object id,String containerName) throws IOException { 183 initialize(); 184 ContainerId containerId=new ContainerId(); 185 containerId.setKey(id); 186 containerId.setDataContainerName(containerName); 187 return maps.containsKey(containerId)||mapsContainer.doesRootExist(containerId); 188 } 189 190 public MapContainer getMapContainer(Object id) throws IOException { 191 return getMapContainer(id,DEFAULT_CONTAINER_NAME); 192 } 193 194 public MapContainer getMapContainer(Object id,String containerName) throws IOException { 195 return getMapContainer(id,containerName,persistentIndex); 196 } 197 198 public synchronized MapContainer getMapContainer(Object id,String containerName,boolean persistentIndex) 199 throws IOException { 200 initialize(); 201 ContainerId containerId=new ContainerId(); 202 containerId.setKey(id); 203 containerId.setDataContainerName(containerName); 204 MapContainerImpl result=maps.get(containerId); 205 if(result==null){ 206 DataManager dm=getDataManager(containerName); 207 IndexManager im=getIndexManager(dm,containerName); 208 IndexItem root=mapsContainer.getRoot(im,containerId); 209 if(root==null){ 210 root=mapsContainer.addRoot(im,containerId); 211 } 212 result=new MapContainerImpl(directory,containerId,root,im,dm,persistentIndex); 213 maps.put(containerId,result); 214 } 215 return result; 216 } 217 218 public void deleteMapContainer(Object id) throws IOException { 219 deleteMapContainer(id,DEFAULT_CONTAINER_NAME); 220 } 221 222 public void deleteMapContainer(Object id,String containerName) throws IOException { 223 ContainerId containerId = new ContainerId(id,containerName); 224 deleteMapContainer(containerId); 225 } 226 227 public synchronized void deleteMapContainer(ContainerId containerId) throws IOException { 228 initialize(); 229 MapContainerImpl container=maps.remove(containerId); 230 if(container!=null){ 231 container.clear(); 232 mapsContainer.removeRoot(container.getIndexManager(),containerId); 233 container.close(); 234 } 235 } 236 237 public synchronized Set <ContainerId> getMapContainerIds() throws IOException { 238 initialize(); 239 Set <ContainerId> set = new HashSet <ContainerId>(); 240 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { 241 ContainerId id = (ContainerId)i.next(); 242 set.add(id); 243 } 244 return set; 245 } 246 247 public boolean doesListContainerExist(Object id) throws IOException { 248 return doesListContainerExist(id,DEFAULT_CONTAINER_NAME); 249 } 250 251 public synchronized boolean doesListContainerExist(Object id,String containerName) throws IOException { 252 initialize(); 253 ContainerId containerId=new ContainerId(); 254 containerId.setKey(id); 255 containerId.setDataContainerName(containerName); 256 return lists.containsKey(containerId)||listsContainer.doesRootExist(containerId); 257 } 258 259 public ListContainer getListContainer(Object id) throws IOException { 260 return getListContainer(id,DEFAULT_CONTAINER_NAME); 261 } 262 263 public ListContainer getListContainer(Object id,String containerName) throws IOException { 264 return getListContainer(id,containerName,persistentIndex); 265 } 266 267 public synchronized ListContainer getListContainer(Object id,String containerName,boolean persistentIndex) 268 throws IOException { 269 initialize(); 270 ContainerId containerId=new ContainerId(); 271 containerId.setKey(id); 272 containerId.setDataContainerName(containerName); 273 ListContainerImpl result=lists.get(containerId); 274 if(result==null){ 275 DataManager dm=getDataManager(containerName); 276 IndexManager im=getIndexManager(dm,containerName); 277 278 IndexItem root=listsContainer.getRoot(im,containerId); 279 if(root==null){ 280 root=listsContainer.addRoot(im,containerId); 281 } 282 result=new ListContainerImpl(containerId,root,im,dm,persistentIndex); 283 lists.put(containerId,result); 284 } 285 return result; 286 } 287 288 public void deleteListContainer(Object id) throws IOException { 289 deleteListContainer(id,DEFAULT_CONTAINER_NAME); 290 } 291 292 public synchronized void deleteListContainer(Object id,String containerName) throws IOException { 293 ContainerId containerId=new ContainerId(id,containerName); 294 deleteListContainer(containerId); 295 } 296 297 public synchronized void deleteListContainer(ContainerId containerId) throws IOException { 298 initialize(); 299 ListContainerImpl container=lists.remove(containerId); 300 if(container!=null){ 301 listsContainer.removeRoot(container.getIndexManager(),containerId); 302 container.clear(); 303 container.close(); 304 } 305 } 306 307 public synchronized Set <ContainerId> getListContainerIds() throws IOException { 308 initialize(); 309 Set <ContainerId> set = new HashSet <ContainerId>(); 310 for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { 311 ContainerId id = (ContainerId)i.next(); 312 set.add(id); 313 } 314 return set; 315 } 316 317 318 319 322 public IndexRootContainer getListsContainer(){ 323 return this.listsContainer; 324 } 325 326 327 330 public IndexRootContainer getMapsContainer(){ 331 return this.mapsContainer; 332 } 333 334 public synchronized DataManager getDataManager(String name) throws IOException { 335 DataManager dm=dataManagers.get(name); 336 if(dm==null){ 337 if( isUseAsyncDataManager() ) { 338 AsyncDataManager t=new AsyncDataManager(); 339 t.setDirectory(directory); 340 t.setFilePrefix("async-data-"+name+"-"); 341 t.setMaxFileLength((int) maxDataFileLength); 342 t.start(); 343 dm=new DataManagerFacade(t, name); 344 } else { 345 DataManagerImpl t=new DataManagerImpl(directory,name); 346 t.setMaxFileLength(maxDataFileLength); 347 dm=t; 348 } 349 if( logIndexChanges ) { 350 recover(dm); 351 } 352 dataManagers.put(name,dm); 353 } 354 return dm; 355 } 356 357 public synchronized IndexManager getIndexManager(DataManager dm,String name) throws IOException { 358 IndexManager im=indexManagers.get(name); 359 if(im==null){ 360 im=new IndexManager(directory,name,mode,logIndexChanges?dm:null); 361 indexManagers.put(name,im); 362 } 363 return im; 364 } 365 366 private void recover(final DataManager dm) throws IOException { 367 dm.recoverRedoItems(new RedoListener(){ 368 public void onRedoItem(StoreLocation item,Object o) throws Exception { 369 RedoStoreIndexItem redo=(RedoStoreIndexItem)o; 370 IndexManager im=getIndexManager(dm,dm.getName()); 372 im.redo(redo); 373 } 374 }); 375 } 376 377 public synchronized boolean isLogIndexChanges(){ 378 return logIndexChanges; 379 } 380 381 public synchronized void setLogIndexChanges(boolean logIndexChanges){ 382 this.logIndexChanges=logIndexChanges; 383 } 384 385 388 public synchronized long getMaxDataFileLength(){ 389 return maxDataFileLength; 390 } 391 392 396 public synchronized void setMaxDataFileLength(long maxDataFileLength){ 397 this.maxDataFileLength=maxDataFileLength; 398 } 399 400 404 public synchronized String getIndexTypeAsString(){ 405 return persistentIndex ? "PERSISTENT":"VM"; 406 } 407 408 414 public synchronized void setIndexTypeAsString(String type){ 415 if(type.equalsIgnoreCase("VM")){ 416 persistentIndex=false; 417 }else{ 418 persistentIndex=true; 419 } 420 } 421 422 public synchronized void initialize() throws IOException { 423 if(closed) 424 throw new IOException ("Store has been closed."); 425 if(!initialized){ 426 427 log.info("Kaha Store using data directory "+directory); 428 DataManager defaultDM=getDataManager(DEFAULT_CONTAINER_NAME); 429 rootIndexManager=getIndexManager(defaultDM,DEFAULT_CONTAINER_NAME); 430 IndexItem mapRoot=new IndexItem(); 431 IndexItem listRoot=new IndexItem(); 432 if(rootIndexManager.isEmpty()){ 433 mapRoot.setOffset(0); 434 rootIndexManager.storeIndex(mapRoot); 435 listRoot.setOffset(IndexItem.INDEX_SIZE); 436 rootIndexManager.storeIndex(listRoot); 437 rootIndexManager.setLength(IndexItem.INDEX_SIZE*2); 438 }else{ 439 mapRoot=rootIndexManager.getIndex(0); 440 listRoot=rootIndexManager.getIndex(IndexItem.INDEX_SIZE); 441 } 442 lock(); 443 initialized=true; 444 mapsContainer=new IndexRootContainer(mapRoot,rootIndexManager,defaultDM); 445 listsContainer=new IndexRootContainer(listRoot,rootIndexManager,defaultDM); 446 449 generateInterestInMapDataFiles(); 450 generateInterestInListDataFiles(); 451 for(Iterator <DataManager> i=dataManagers.values().iterator();i.hasNext();){ 452 DataManager dm=i.next(); 453 dm.consolidateDataFiles(); 454 } 455 } 456 } 457 458 private synchronized void lock() throws IOException { 459 if(!disableLocking&&directory!=null&&lock==null){ 460 Set <String > set=getVmLockSet(); 461 synchronized(set){ 462 if(lock==null){ 463 if(!set.add(directory.getCanonicalPath())){ 464 throw new StoreLockedExcpetion("Kaha Store "+directory.getName() 465 +" is already opened by this application."); 466 } 467 if(!brokenFileLock){ 468 lock=rootIndexManager.getLock(); 469 if(lock==null){ 470 set.remove(directory.getCanonicalPath()); 471 throw new StoreLockedExcpetion("Kaha Store "+directory.getName() 472 +" is already opened by another application"); 473 } 474 } 475 } 476 } 477 } 478 } 479 480 private synchronized void unlock() throws IOException { 481 if(!disableLocking&&directory!=null){ 482 Set <String > set=getVmLockSet(); 483 synchronized(set){ 484 if(lock!=null){ 485 set.remove(directory.getCanonicalPath()); 486 if(lock.isValid()){ 487 lock.release(); 488 } 489 lock=null; 490 } 491 } 492 } 493 } 494 495 private void checkClosed(){ 496 if(closed){ 497 throw new RuntimeStoreException("The store is closed"); 498 } 499 } 500 501 502 503 static synchronized private Set <String > getVmLockSet(){ 504 if(lockSet==null){ 505 Properties properties=System.getProperties(); 506 synchronized(properties){ 507 lockSet=(Set <String >) properties.get("org.apache.activemq.kaha.impl.KahaStore"); 508 if(lockSet==null){ 509 lockSet=new HashSet <String >(); 510 } 511 properties.put(PROPERTY_PREFIX,lockSet); 512 } 513 } 514 return lockSet; 515 } 516 517 521 private void generateInterestInListDataFiles() throws IOException { 522 for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){ 523 ContainerId id=(ContainerId)i.next(); 524 DataManager dm=getDataManager(id.getDataContainerName()); 525 IndexManager im=getIndexManager(dm,id.getDataContainerName()); 526 IndexItem theRoot=listsContainer.getRoot(im,id); 527 long nextItem=theRoot.getNextItem(); 528 while(nextItem!=Item.POSITION_NOT_SET){ 529 IndexItem item=im.getIndex(nextItem); 530 item.setOffset(nextItem); 531 dm.addInterestInFile(item.getKeyFile()); 532 dm.addInterestInFile(item.getValueFile()); 533 nextItem=item.getNextItem(); 534 } 535 } 536 } 537 538 543 private void generateInterestInMapDataFiles() throws IOException { 544 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { 545 ContainerId id = (ContainerId)i.next(); 546 DataManager dm = getDataManager(id.getDataContainerName()); 547 IndexManager im = getIndexManager(dm,id.getDataContainerName()); 548 IndexItem theRoot=mapsContainer.getRoot(im,id); 549 long nextItem=theRoot.getNextItem(); 550 while(nextItem!=Item.POSITION_NOT_SET){ 551 IndexItem item=im.getIndex(nextItem); 552 item.setOffset(nextItem); 553 dm.addInterestInFile(item.getKeyFile()); 554 dm.addInterestInFile(item.getValueFile()); 555 nextItem=item.getNextItem(); 556 } 557 558 } 559 } 560 561 public synchronized boolean isUseAsyncDataManager() { 562 return useAsyncDataManager; 563 } 564 565 public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) { 566 this.useAsyncDataManager = useAsyncWriter; 567 } 568 569 570 571 } 572 | Popular Tags |