KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > kaha > impl > data > DataManagerImpl


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.kaha.impl.data;
19
20 import java.io.File JavaDoc;
21 import java.io.FilenameFilter JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.util.ArrayList JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Map JavaDoc;
28
29 import org.apache.activemq.kaha.Marshaller;
30 import org.apache.activemq.kaha.StoreLocation;
31 import org.apache.activemq.kaha.impl.DataManager;
32 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
33 import org.apache.activemq.util.IOExceptionSupport;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 /**
37  * Manages DataFiles
38  *
39  * @version $Revision: 1.1.1.1 $
40  */

41 public final class DataManagerImpl implements DataManager {
42     
43     private static final Log log=LogFactory.getLog(DataManagerImpl.class);
44     public static long MAX_FILE_LENGTH=1024*1024*32;
45     private static final String JavaDoc NAME_PREFIX="data-";
46     private final File JavaDoc dir;
47     private final String JavaDoc name;
48     private SyncDataFileReader reader;
49     private SyncDataFileWriter writer;
50     private DataFile currentWriteFile;
51     private long maxFileLength = MAX_FILE_LENGTH;
52     Map JavaDoc fileMap=new HashMap JavaDoc();
53     
54     public static final int ITEM_HEAD_SIZE=5; // type + length
55
public static final byte DATA_ITEM_TYPE=1;
56     public static final byte REDO_ITEM_TYPE=2;
57     
58     Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
59     private String JavaDoc dataFilePrefix;
60    
61     public DataManagerImpl(File JavaDoc dir, final String JavaDoc name){
62         this.dir=dir;
63         this.name=name;
64         
65         dataFilePrefix = NAME_PREFIX+name+"-";
66         // build up list of current dataFiles
67
File JavaDoc[] files=dir.listFiles(new FilenameFilter JavaDoc(){
68             public boolean accept(File JavaDoc dir,String JavaDoc n){
69                 return dir.equals(dir)&&n.startsWith(dataFilePrefix);
70             }
71         });
72         if(files!=null){
73             for(int i=0;i<files.length;i++){
74                 File JavaDoc file=files[i];
75                 String JavaDoc n=file.getName();
76                 String JavaDoc numStr=n.substring(dataFilePrefix.length(),n.length());
77                 int num=Integer.parseInt(numStr);
78                 DataFile dataFile=new DataFile(file,num);
79                 fileMap.put(dataFile.getNumber(),dataFile);
80                 if(currentWriteFile==null||currentWriteFile.getNumber().intValue()<num){
81                     currentWriteFile=dataFile;
82                 }
83             }
84         }
85     }
86     
87     private DataFile createAndAddDataFile(int num){
88         String JavaDoc fileName=dataFilePrefix+num;
89         File JavaDoc file=new File JavaDoc(dir,fileName);
90         DataFile result=new DataFile(file,num);
91         fileMap.put(result.getNumber(),result);
92         return result;
93     }
94
95     /* (non-Javadoc)
96      * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
97      */

98     public String JavaDoc getName(){
99         return name;
100     }
101
102     synchronized DataFile findSpaceForData(DataItem item) throws IOException JavaDoc{
103         if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>maxFileLength)){
104             int nextNum=currentWriteFile!=null?currentWriteFile.getNumber().intValue()+1:1;
105             if(currentWriteFile!=null&&currentWriteFile.isUnused()){
106                 removeDataFile(currentWriteFile);
107             }
108             currentWriteFile=createAndAddDataFile(nextNum);
109         }
110         item.setOffset(currentWriteFile.getLength());
111         item.setFile(currentWriteFile.getNumber().intValue());
112         currentWriteFile.incrementLength(item.getSize()+ITEM_HEAD_SIZE);
113         return currentWriteFile;
114     }
115
116     DataFile getDataFile(StoreLocation item) throws IOException JavaDoc{
117         Integer JavaDoc key=new Integer JavaDoc(item.getFile());
118         DataFile dataFile=(DataFile) fileMap.get(key);
119         if(dataFile==null){
120             log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
121             throw new IOException JavaDoc("Could not locate data file "+NAME_PREFIX+name+"-"+item.getFile());
122         }
123         return dataFile;
124     }
125     
126     /* (non-Javadoc)
127      * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
128      */

129     public synchronized Object JavaDoc readItem(Marshaller marshaller, StoreLocation item) throws IOException JavaDoc{
130         return getReader().readItem(marshaller,item);
131     }
132
133     /* (non-Javadoc)
134      * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller, java.lang.Object)
135      */

136     public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object JavaDoc payload) throws IOException JavaDoc{
137         return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE);
138     }
139     
140     /* (non-Javadoc)
141      * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
142      */

143     public synchronized StoreLocation storeRedoItem(Object JavaDoc payload) throws IOException JavaDoc{
144         return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
145     }
146     
147     /* (non-Javadoc)
148      * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation, org.apache.activemq.kaha.Marshaller, java.lang.Object)
149      */

150     public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object JavaDoc payload) throws IOException JavaDoc {
151         getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE);
152     }
153
154     /* (non-Javadoc)
155      * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
156      */

157     public synchronized void recoverRedoItems(RedoListener listener) throws IOException JavaDoc{
158         
159         // Nothing to recover if there is no current file.
160
if( currentWriteFile == null )
161             return;
162         
163         DataItem item = new DataItem();
164         item.setFile(currentWriteFile.getNumber().intValue());
165         item.setOffset(0);
166         while( true ) {
167             byte type;
168             try {
169                 type = getReader().readDataItemSize(item);
170             } catch (IOException JavaDoc ignore) {
171                 log.trace("End of data file reached at (header was invalid): "+item);
172                 return;
173             }
174             if( type == REDO_ITEM_TYPE ) {
175                 // Un-marshal the redo item
176
Object JavaDoc object;
177                 try {
178                     object = readItem(redoMarshaller, item);
179                 } catch (IOException JavaDoc e1) {
180                     log.trace("End of data file reached at (payload was invalid): "+item);
181                     return;
182                 }
183                 try {
184                     
185                     listener.onRedoItem(item, object);
186                     // in case the listener is holding on to item references, copy it
187
// so we don't change it behind the listener's back.
188
item = item.copy();
189                     
190                 } catch (Exception JavaDoc e) {
191                     throw IOExceptionSupport.create("Recovery handler failed: "+e,e);
192                 }
193             }
194             // Move to the next item.
195
item.setOffset(item.getOffset()+ITEM_HEAD_SIZE+item.getSize());
196         }
197     }
198     
199     /* (non-Javadoc)
200      * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
201      */

202     public synchronized void close() throws IOException JavaDoc{
203         getWriter().close();
204         for(Iterator JavaDoc i=fileMap.values().iterator();i.hasNext();){
205             DataFile dataFile=(DataFile) i.next();
206             getWriter().force(dataFile);
207             dataFile.close();
208         }
209         fileMap.clear();
210     }
211
212     /* (non-Javadoc)
213      * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
214      */

215     public synchronized void force() throws IOException JavaDoc{
216         for(Iterator JavaDoc i=fileMap.values().iterator();i.hasNext();){
217             DataFile dataFile=(DataFile) i.next();
218             getWriter().force(dataFile);
219         }
220     }
221
222         
223     /* (non-Javadoc)
224      * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
225      */

226     public synchronized boolean delete() throws IOException JavaDoc{
227         boolean result=true;
228         for(Iterator JavaDoc i=fileMap.values().iterator();i.hasNext();){
229             DataFile dataFile=(DataFile) i.next();
230             result&=dataFile.delete();
231         }
232         fileMap.clear();
233         return result;
234     }
235     
236
237     /* (non-Javadoc)
238      * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
239      */

240     public synchronized void addInterestInFile(int file) throws IOException JavaDoc{
241         if(file>=0){
242             Integer JavaDoc key=new Integer JavaDoc(file);
243             DataFile dataFile=(DataFile) fileMap.get(key);
244             if(dataFile==null){
245                 dataFile=createAndAddDataFile(file);
246             }
247             addInterestInFile(dataFile);
248         }
249     }
250
251     synchronized void addInterestInFile(DataFile dataFile){
252         if(dataFile!=null){
253             dataFile.increment();
254         }
255     }
256
257     /* (non-Javadoc)
258      * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
259      */

260     public synchronized void removeInterestInFile(int file) throws IOException JavaDoc{
261         if(file>=0){
262             Integer JavaDoc key=new Integer JavaDoc(file);
263             DataFile dataFile=(DataFile) fileMap.get(key);
264             removeInterestInFile(dataFile);
265         }
266     }
267
268     synchronized void removeInterestInFile(DataFile dataFile) throws IOException JavaDoc{
269         if(dataFile!=null){
270             if(dataFile.decrement()<=0){
271                 if(dataFile!=currentWriteFile){
272                     removeDataFile(dataFile);
273                 }
274             }
275         }
276     }
277
278     /* (non-Javadoc)
279      * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
280      */

281     public synchronized void consolidateDataFiles() throws IOException JavaDoc{
282         List JavaDoc purgeList=new ArrayList JavaDoc();
283         for(Iterator JavaDoc i=fileMap.values().iterator();i.hasNext();){
284             DataFile dataFile=(DataFile) i.next();
285             if(dataFile.isUnused() && dataFile != currentWriteFile){
286                 purgeList.add(dataFile);
287             }
288         }
289         for(int i=0;i<purgeList.size();i++){
290             DataFile dataFile=(DataFile) purgeList.get(i);
291             removeDataFile(dataFile);
292         }
293     }
294
295     private void removeDataFile(DataFile dataFile) throws IOException JavaDoc{
296         fileMap.remove(dataFile.getNumber());
297         if(writer!=null){
298             writer.force(dataFile);
299         }
300         boolean result=dataFile.delete();
301         log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
302     }
303
304     /* (non-Javadoc)
305      * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
306      */

307     public Marshaller getRedoMarshaller() {
308         return redoMarshaller;
309     }
310
311     /* (non-Javadoc)
312      * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
313      */

314     public void setRedoMarshaller(Marshaller redoMarshaller) {
315         this.redoMarshaller = redoMarshaller;
316     }
317
318     /**
319      * @return the maxFileLength
320      */

321     public long getMaxFileLength(){
322         return maxFileLength;
323     }
324
325     /**
326      * @param maxFileLength the maxFileLength to set
327      */

328     public void setMaxFileLength(long maxFileLength){
329         this.maxFileLength=maxFileLength;
330     }
331     
332     public String JavaDoc toString(){
333         return "DataManager:("+NAME_PREFIX+name+")";
334     }
335
336     public synchronized SyncDataFileReader getReader() {
337         if( reader == null ) {
338             reader = createReader();
339         }
340         return reader;
341     }
342     protected synchronized SyncDataFileReader createReader() {
343         return new SyncDataFileReader(this);
344     }
345     public synchronized void setReader(SyncDataFileReader reader) {
346         this.reader = reader;
347     }
348
349     public synchronized SyncDataFileWriter getWriter() {
350         if( writer==null ) {
351             writer = createWriter();
352         }
353         return writer;
354     }
355     private SyncDataFileWriter createWriter() {
356         return new SyncDataFileWriter(this);
357     }
358     public synchronized void setWriter(SyncDataFileWriter writer) {
359         this.writer = writer;
360     }
361
362 }
363
Popular Tags