KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > kaha > impl > KahaStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with this
5  * work for additional information regarding copyright ownership. The ASF
6  * licenses this file to You under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15  * License for the specific language governing permissions and limitations under
16  * the License.
17  */

18
19 package org.apache.activemq.kaha.impl;
20
21 import java.io.File JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.nio.channels.FileLock JavaDoc;
24 import java.util.HashSet JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.Map JavaDoc;
27 import java.util.Properties JavaDoc;
28 import java.util.Set JavaDoc;
29 import java.util.concurrent.ConcurrentHashMap JavaDoc;
30 import org.apache.activemq.kaha.ContainerId;
31 import org.apache.activemq.kaha.ListContainer;
32 import org.apache.activemq.kaha.MapContainer;
33 import org.apache.activemq.kaha.RuntimeStoreException;
34 import org.apache.activemq.kaha.Store;
35 import org.apache.activemq.kaha.StoreLocation;
36 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
37 import org.apache.activemq.kaha.impl.async.DataManagerFacade;
38 import org.apache.activemq.kaha.impl.container.ListContainerImpl;
39 import org.apache.activemq.kaha.impl.container.MapContainerImpl;
40 import org.apache.activemq.kaha.impl.data.DataManagerImpl;
41 import org.apache.activemq.kaha.impl.data.Item;
42 import org.apache.activemq.kaha.impl.data.RedoListener;
43 import org.apache.activemq.kaha.impl.index.IndexItem;
44 import org.apache.activemq.kaha.impl.index.IndexManager;
45 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48
49 /**
50  * Store Implementation
51  *
52  * @version $Revision: 1.1.1.1 $
53  */

54 public class KahaStore implements Store{
55
56     private static final String JavaDoc LOCK_FILE_NAME="store.lock";
57     
58     private final static String JavaDoc PROPERTY_PREFIX="org.apache.activemq.kaha.Store";
59     private final static boolean brokenFileLock="true".equals(System.getProperty(PROPERTY_PREFIX+".broken","false"));
60     private final static boolean disableLocking="true".equals(System.getProperty(PROPERTY_PREFIX+"DisableLocking",
61             "false"));
62     private static Set JavaDoc<String JavaDoc> lockSet;
63     private static final Log log=LogFactory.getLog(KahaStore.class);
64     private File JavaDoc directory;
65     private IndexRootContainer mapsContainer;
66     private IndexRootContainer listsContainer;
67     private Map JavaDoc<ContainerId, ListContainerImpl> lists=new ConcurrentHashMap JavaDoc<ContainerId, ListContainerImpl>();
68     private Map JavaDoc<ContainerId, MapContainerImpl> maps=new ConcurrentHashMap JavaDoc<ContainerId, MapContainerImpl>();
69     private Map JavaDoc<String JavaDoc, DataManager> dataManagers=new ConcurrentHashMap JavaDoc<String JavaDoc, DataManager>();
70     private Map JavaDoc<String JavaDoc, IndexManager> indexManagers=new ConcurrentHashMap JavaDoc<String JavaDoc, IndexManager>();
71     private IndexManager rootIndexManager; // contains all the root indexes
72
private boolean closed=false;
73     private String JavaDoc mode;
74     private boolean initialized;
75     private boolean logIndexChanges=false;
76     private boolean useAsyncDataManager=false;
77     private long maxDataFileLength=1024*1024*32;
78     private FileLock JavaDoc lock;
79     private boolean persistentIndex=true;
80
81     public KahaStore(String JavaDoc name,String JavaDoc mode) throws IOException JavaDoc{
82         this.mode=mode;
83         directory=new File JavaDoc(name);
84         directory.mkdirs();
85     }
86
87     public synchronized void close() throws IOException JavaDoc{
88         if(!closed){
89             closed=true;
90             if(initialized){
91                 unlock();
92                 
93                 for (ListContainerImpl container : lists.values()) {
94                     container.close();
95                 }
96                 lists.clear();
97                 for (MapContainerImpl container : maps.values()) {
98                     container.close();
99                 }
100                 maps.clear();
101                 for(Iterator JavaDoc<IndexManager> iter=indexManagers.values().iterator();iter.hasNext();){
102                     IndexManager im=iter.next();
103                     im.close();
104                     iter.remove();
105                 }
106                 for(Iterator JavaDoc<DataManager> iter=dataManagers.values().iterator();iter.hasNext();){
107                     DataManager dm=iter.next();
108                     dm.close();
109                     iter.remove();
110                 }
111             }
112         }
113     }
114
115     public synchronized void force() throws IOException JavaDoc{
116         if(initialized){
117             for(Iterator JavaDoc<IndexManager> iter=indexManagers.values().iterator();iter.hasNext();){
118                 IndexManager im=iter.next();
119                 im.force();
120             }
121             for(Iterator JavaDoc<DataManager> iter=dataManagers.values().iterator();iter.hasNext();){
122                 DataManager dm=iter.next();
123                 dm.force();
124             }
125         }
126     }
127
128     public synchronized void clear() throws IOException JavaDoc{
129         initialize();
130         for (Iterator JavaDoc i = mapsContainer.getKeys().iterator(); i.hasNext();) {
131             ContainerId id = (ContainerId)i.next();
132             MapContainer container = getMapContainer(id.getKey(),id.getDataContainerName());
133             container.clear();
134         }
135         for (Iterator JavaDoc i = listsContainer.getKeys().iterator(); i.hasNext();) {
136             ContainerId id = (ContainerId)i.next();
137             ListContainer container = getListContainer(id.getKey(),id.getDataContainerName());
138             container.clear();
139         }
140         
141     }
142
143     public synchronized boolean delete() throws IOException JavaDoc{
144         boolean result=true;
145         if(initialized){
146             clear();
147             for(Iterator JavaDoc<IndexManager> iter=indexManagers.values().iterator();iter.hasNext();){
148                 IndexManager im=iter.next();
149                 result&=im.delete();
150                 iter.remove();
151             }
152             for(Iterator JavaDoc<DataManager> iter=dataManagers.values().iterator();iter.hasNext();){
153                 DataManager dm=iter.next();
154                 result&=dm.delete();
155                 iter.remove();
156             }
157         }
158         if(directory!=null&&directory.isDirectory()){
159             File JavaDoc[] files=directory.listFiles();
160             if(files!=null){
161                 for(int i=0;i<files.length;i++){
162                     File JavaDoc file=files[i];
163                     if(!file.isDirectory()){
164                         result&=file.delete();
165                     }
166                 }
167             }
168             String JavaDoc str=result?"successfully deleted":"failed to delete";
169             log.info("Kaha Store "+str+" data directory "+directory);
170         }
171         return result;
172     }
173     
174     public synchronized boolean isInitialized(){
175         return initialized;
176     }
177
178     public boolean doesMapContainerExist(Object JavaDoc id) throws IOException JavaDoc{
179         return doesMapContainerExist(id,DEFAULT_CONTAINER_NAME);
180     }
181
182     public synchronized boolean doesMapContainerExist(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
183         initialize();
184         ContainerId containerId=new ContainerId();
185         containerId.setKey(id);
186         containerId.setDataContainerName(containerName);
187         return maps.containsKey(containerId)||mapsContainer.doesRootExist(containerId);
188     }
189
190     public MapContainer getMapContainer(Object JavaDoc id) throws IOException JavaDoc{
191         return getMapContainer(id,DEFAULT_CONTAINER_NAME);
192     }
193
194     public MapContainer getMapContainer(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
195         return getMapContainer(id,containerName,persistentIndex);
196     }
197
198     public synchronized MapContainer getMapContainer(Object JavaDoc id,String JavaDoc containerName,boolean persistentIndex)
199             throws IOException JavaDoc{
200         initialize();
201         ContainerId containerId=new ContainerId();
202         containerId.setKey(id);
203         containerId.setDataContainerName(containerName);
204         MapContainerImpl result=maps.get(containerId);
205         if(result==null){
206             DataManager dm=getDataManager(containerName);
207             IndexManager im=getIndexManager(dm,containerName);
208             IndexItem root=mapsContainer.getRoot(im,containerId);
209             if(root==null){
210                 root=mapsContainer.addRoot(im,containerId);
211             }
212             result=new MapContainerImpl(directory,containerId,root,im,dm,persistentIndex);
213             maps.put(containerId,result);
214         }
215         return result;
216     }
217
218     public void deleteMapContainer(Object JavaDoc id) throws IOException JavaDoc{
219         deleteMapContainer(id,DEFAULT_CONTAINER_NAME);
220     }
221     
222     public void deleteMapContainer(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
223         ContainerId containerId = new ContainerId(id,containerName);
224         deleteMapContainer(containerId);
225     }
226
227     public synchronized void deleteMapContainer(ContainerId containerId) throws IOException JavaDoc{
228         initialize();
229         MapContainerImpl container=maps.remove(containerId);
230         if(container!=null){
231             container.clear();
232             mapsContainer.removeRoot(container.getIndexManager(),containerId);
233             container.close();
234         }
235     }
236
237     public synchronized Set JavaDoc<ContainerId> getMapContainerIds() throws IOException JavaDoc{
238         initialize();
239         Set JavaDoc<ContainerId> set = new HashSet JavaDoc<ContainerId>();
240         for (Iterator JavaDoc i = mapsContainer.getKeys().iterator(); i.hasNext();) {
241             ContainerId id = (ContainerId)i.next();
242             set.add(id);
243         }
244         return set;
245     }
246
247     public boolean doesListContainerExist(Object JavaDoc id) throws IOException JavaDoc{
248         return doesListContainerExist(id,DEFAULT_CONTAINER_NAME);
249     }
250
251     public synchronized boolean doesListContainerExist(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
252         initialize();
253         ContainerId containerId=new ContainerId();
254         containerId.setKey(id);
255         containerId.setDataContainerName(containerName);
256         return lists.containsKey(containerId)||listsContainer.doesRootExist(containerId);
257     }
258
259     public ListContainer getListContainer(Object JavaDoc id) throws IOException JavaDoc{
260         return getListContainer(id,DEFAULT_CONTAINER_NAME);
261     }
262
263     public ListContainer getListContainer(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
264         return getListContainer(id,containerName,persistentIndex);
265     }
266
267     public synchronized ListContainer getListContainer(Object JavaDoc id,String JavaDoc containerName,boolean persistentIndex)
268             throws IOException JavaDoc{
269         initialize();
270         ContainerId containerId=new ContainerId();
271         containerId.setKey(id);
272         containerId.setDataContainerName(containerName);
273         ListContainerImpl result=lists.get(containerId);
274         if(result==null){
275             DataManager dm=getDataManager(containerName);
276             IndexManager im=getIndexManager(dm,containerName);
277             
278             IndexItem root=listsContainer.getRoot(im,containerId);
279             if(root==null){
280                 root=listsContainer.addRoot(im,containerId);
281             }
282             result=new ListContainerImpl(containerId,root,im,dm,persistentIndex);
283             lists.put(containerId,result);
284         }
285         return result;
286     }
287
288     public void deleteListContainer(Object JavaDoc id) throws IOException JavaDoc{
289         deleteListContainer(id,DEFAULT_CONTAINER_NAME);
290     }
291     
292     public synchronized void deleteListContainer(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
293         ContainerId containerId=new ContainerId(id,containerName);
294         deleteListContainer(containerId);
295     }
296
297     public synchronized void deleteListContainer(ContainerId containerId) throws IOException JavaDoc{
298         initialize();
299         ListContainerImpl container=lists.remove(containerId);
300         if(container!=null){
301             listsContainer.removeRoot(container.getIndexManager(),containerId);
302             container.clear();
303             container.close();
304         }
305     }
306
307     public synchronized Set JavaDoc<ContainerId> getListContainerIds() throws IOException JavaDoc{
308         initialize();
309         Set JavaDoc<ContainerId> set = new HashSet JavaDoc<ContainerId>();
310         for (Iterator JavaDoc i = listsContainer.getKeys().iterator(); i.hasNext();) {
311             ContainerId id = (ContainerId)i.next();
312             set.add(id);
313         }
314         return set;
315     }
316
317     
318     
319     /**
320      * @return the listsContainer
321      */

322     public IndexRootContainer getListsContainer(){
323         return this.listsContainer;
324     }
325
326     
327     /**
328      * @return the mapsContainer
329      */

330     public IndexRootContainer getMapsContainer(){
331         return this.mapsContainer;
332     }
333
334     public synchronized DataManager getDataManager(String JavaDoc name) throws IOException JavaDoc{
335         DataManager dm=dataManagers.get(name);
336         if(dm==null){
337             if( isUseAsyncDataManager() ) {
338                 AsyncDataManager t=new AsyncDataManager();
339                 t.setDirectory(directory);
340                 t.setFilePrefix("async-data-"+name+"-");
341                 t.setMaxFileLength((int) maxDataFileLength);
342                 t.start();
343                 dm=new DataManagerFacade(t, name);
344             } else {
345                 DataManagerImpl t=new DataManagerImpl(directory,name);
346                 t.setMaxFileLength(maxDataFileLength);
347                 dm=t;
348             }
349             if( logIndexChanges ) {
350                 recover(dm);
351             }
352             dataManagers.put(name,dm);
353         }
354         return dm;
355     }
356
357     public synchronized IndexManager getIndexManager(DataManager dm,String JavaDoc name) throws IOException JavaDoc{
358         IndexManager im=indexManagers.get(name);
359         if(im==null){
360             im=new IndexManager(directory,name,mode,logIndexChanges?dm:null);
361             indexManagers.put(name,im);
362         }
363         return im;
364     }
365
366     private void recover(final DataManager dm) throws IOException JavaDoc{
367         dm.recoverRedoItems(new RedoListener(){
368             public void onRedoItem(StoreLocation item,Object JavaDoc o) throws Exception JavaDoc{
369                 RedoStoreIndexItem redo=(RedoStoreIndexItem)o;
370                 // IndexManager im = getIndexManager(dm, redo.getIndexName());
371
IndexManager im=getIndexManager(dm,dm.getName());
372                 im.redo(redo);
373             }
374         });
375     }
376
377     public synchronized boolean isLogIndexChanges(){
378         return logIndexChanges;
379     }
380
381     public synchronized void setLogIndexChanges(boolean logIndexChanges){
382         this.logIndexChanges=logIndexChanges;
383     }
384
385     /**
386      * @return the maxDataFileLength
387      */

388     public synchronized long getMaxDataFileLength(){
389         return maxDataFileLength;
390     }
391
392     /**
393      * @param maxDataFileLength
394      * the maxDataFileLength to set
395      */

396     public synchronized void setMaxDataFileLength(long maxDataFileLength){
397         this.maxDataFileLength=maxDataFileLength;
398     }
399
400     /**
401      * @see org.apache.activemq.kaha.IndexTypes
402      * @return the default index type
403      */

404     public synchronized String JavaDoc getIndexTypeAsString(){
405         return persistentIndex ? "PERSISTENT":"VM";
406     }
407
408     /**
409      * Set the default index type
410      *
411      * @param type
412      * @see org.apache.activemq.kaha.IndexTypes
413      */

414     public synchronized void setIndexTypeAsString(String JavaDoc type){
415         if(type.equalsIgnoreCase("VM")){
416             persistentIndex=false;
417         }else{
418             persistentIndex=true;
419         }
420     }
421     
422     public synchronized void initialize() throws IOException JavaDoc{
423         if(closed)
424             throw new IOException JavaDoc("Store has been closed.");
425         if(!initialized){
426            
427             log.info("Kaha Store using data directory "+directory);
428             DataManager defaultDM=getDataManager(DEFAULT_CONTAINER_NAME);
429             rootIndexManager=getIndexManager(defaultDM,DEFAULT_CONTAINER_NAME);
430             IndexItem mapRoot=new IndexItem();
431             IndexItem listRoot=new IndexItem();
432             if(rootIndexManager.isEmpty()){
433                 mapRoot.setOffset(0);
434                 rootIndexManager.storeIndex(mapRoot);
435                 listRoot.setOffset(IndexItem.INDEX_SIZE);
436                 rootIndexManager.storeIndex(listRoot);
437                 rootIndexManager.setLength(IndexItem.INDEX_SIZE*2);
438             }else{
439                 mapRoot=rootIndexManager.getIndex(0);
440                 listRoot=rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
441             }
442             lock();
443             initialized=true;
444             mapsContainer=new IndexRootContainer(mapRoot,rootIndexManager,defaultDM);
445             listsContainer=new IndexRootContainer(listRoot,rootIndexManager,defaultDM);
446             /**
447              * Add interest in data files - then consolidate them
448              */

449             generateInterestInMapDataFiles();
450             generateInterestInListDataFiles();
451             for(Iterator JavaDoc<DataManager> i=dataManagers.values().iterator();i.hasNext();){
452                 DataManager dm=i.next();
453                 dm.consolidateDataFiles();
454             }
455         }
456     }
457
458     private synchronized void lock() throws IOException JavaDoc{
459         if(!disableLocking&&directory!=null&&lock==null){
460             Set JavaDoc<String JavaDoc> set=getVmLockSet();
461             synchronized(set){
462                 if(lock==null){
463                     if(!set.add(directory.getCanonicalPath())){
464                         throw new StoreLockedExcpetion("Kaha Store "+directory.getName()
465                                 +" is already opened by this application.");
466                     }
467                     if(!brokenFileLock){
468                         lock=rootIndexManager.getLock();
469                         if(lock==null){
470                             set.remove(directory.getCanonicalPath());
471                             throw new StoreLockedExcpetion("Kaha Store "+directory.getName()
472                                     +" is already opened by another application");
473                         }
474                     }
475                 }
476             }
477         }
478     }
479
480     private synchronized void unlock() throws IOException JavaDoc{
481         if(!disableLocking&&directory!=null){
482             Set JavaDoc<String JavaDoc> set=getVmLockSet();
483             synchronized(set){
484                 if(lock!=null){
485                     set.remove(directory.getCanonicalPath());
486                     if(lock.isValid()){
487                         lock.release();
488                     }
489                     lock=null;
490                 }
491             }
492         }
493     }
494     
495     private void checkClosed(){
496         if(closed){
497             throw new RuntimeStoreException("The store is closed");
498         }
499     }
500
501     
502
503     static synchronized private Set JavaDoc<String JavaDoc> getVmLockSet(){
504         if(lockSet==null){
505             Properties JavaDoc properties=System.getProperties();
506             synchronized(properties){
507                 lockSet=(Set JavaDoc<String JavaDoc>) properties.get("org.apache.activemq.kaha.impl.KahaStore");
508                 if(lockSet==null){
509                     lockSet=new HashSet JavaDoc<String JavaDoc>();
510                 }
511                 properties.put(PROPERTY_PREFIX,lockSet);
512             }
513         }
514         return lockSet;
515     }
516     
517     /**
518      * scans the directory and builds up the IndexManager and DataManager
519      * @throws IOException
520      */

521     private void generateInterestInListDataFiles() throws IOException JavaDoc{
522         for(Iterator JavaDoc i=listsContainer.getKeys().iterator();i.hasNext();){
523             ContainerId id=(ContainerId)i.next();
524             DataManager dm=getDataManager(id.getDataContainerName());
525             IndexManager im=getIndexManager(dm,id.getDataContainerName());
526             IndexItem theRoot=listsContainer.getRoot(im,id);
527             long nextItem=theRoot.getNextItem();
528             while(nextItem!=Item.POSITION_NOT_SET){
529                 IndexItem item=im.getIndex(nextItem);
530                 item.setOffset(nextItem);
531                 dm.addInterestInFile(item.getKeyFile());
532                 dm.addInterestInFile(item.getValueFile());
533                 nextItem=item.getNextItem();
534             }
535         }
536     }
537     
538     /**
539      * scans the directory and builds up the IndexManager and DataManager
540      *
541      * @throws IOException
542      */

543     private void generateInterestInMapDataFiles() throws IOException JavaDoc {
544         for (Iterator JavaDoc i = mapsContainer.getKeys().iterator(); i.hasNext();) {
545             ContainerId id = (ContainerId)i.next();
546             DataManager dm = getDataManager(id.getDataContainerName());
547             IndexManager im = getIndexManager(dm,id.getDataContainerName());
548             IndexItem theRoot=mapsContainer.getRoot(im,id);
549             long nextItem=theRoot.getNextItem();
550             while(nextItem!=Item.POSITION_NOT_SET){
551                 IndexItem item=im.getIndex(nextItem);
552                 item.setOffset(nextItem);
553                 dm.addInterestInFile(item.getKeyFile());
554                 dm.addInterestInFile(item.getValueFile());
555                 nextItem=item.getNextItem();
556             }
557             
558         }
559     }
560
561     public synchronized boolean isUseAsyncDataManager() {
562         return useAsyncDataManager;
563     }
564
565     public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
566         this.useAsyncDataManager = useAsyncWriter;
567     }
568
569     
570    
571 }
572
Popular Tags