KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > kaha > impl > async > AsyncDataManager


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.kaha.impl.async;
19
20 import java.io.ByteArrayInputStream JavaDoc;
21 import java.io.ByteArrayOutputStream JavaDoc;
22 import java.io.DataInputStream JavaDoc;
23 import java.io.DataOutputStream JavaDoc;
24 import java.io.File JavaDoc;
25 import java.io.FilenameFilter JavaDoc;
26 import java.io.IOException JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.Collections JavaDoc;
29 import java.util.HashMap JavaDoc;
30 import java.util.HashSet JavaDoc;
31 import java.util.Iterator JavaDoc;
32 import java.util.List JavaDoc;
33 import java.util.Map JavaDoc;
34 import java.util.Set JavaDoc;
35 import java.util.concurrent.ConcurrentHashMap JavaDoc;
36 import java.util.concurrent.atomic.AtomicReference JavaDoc;
37
38 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
39 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
40 import org.apache.activemq.thread.Scheduler;
41 import org.apache.activemq.util.ByteSequence;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44
45 /**
46  * Manages DataFiles
47  *
48  * @version $Revision: 1.1.1.1 $
49  */

50 public final class AsyncDataManager {
51
52     private static final Log log=LogFactory.getLog(AsyncDataManager.class);
53     
54     public static int CONTROL_RECORD_MAX_LENGTH=1024;
55     
56     public static final int ITEM_HEAD_RESERVED_SPACE=21;
57     // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
58
public static final int ITEM_HEAD_SPACE=4+1+ITEM_HEAD_RESERVED_SPACE+3;
59     public static final int ITEM_HEAD_OFFSET_TO_SOR=ITEM_HEAD_SPACE-3;
60     public static final int ITEM_FOOT_SPACE=3; // EOR
61

62     public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE;
63
64     public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; //
65
public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; //
66

67     public static final byte DATA_ITEM_TYPE=1;
68     public static final byte REDO_ITEM_TYPE=2;
69     
70     public static String JavaDoc DEFAULT_DIRECTORY="data";
71     public static String JavaDoc DEFAULT_FILE_PREFIX="data-";
72     public static int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
73     
74     private File JavaDoc directory = new File JavaDoc(DEFAULT_DIRECTORY);
75     private String JavaDoc filePrefix=DEFAULT_FILE_PREFIX;
76     private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
77     private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512;
78     
79     private DataFileAppender appender;
80     private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
81
82     private Map JavaDoc<Integer JavaDoc,DataFile> fileMap=new HashMap JavaDoc<Integer JavaDoc,DataFile>();
83     private DataFile currentWriteFile;
84     ControlFile controlFile;
85     
86     private Location mark;
87     private final AtomicReference JavaDoc<Location> lastAppendLocation = new AtomicReference JavaDoc<Location>();
88     boolean started = false;
89     boolean useNio = true;
90     
91     protected final ConcurrentHashMap JavaDoc<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap JavaDoc<WriteKey, WriteCommand>();
92
93     private Runnable JavaDoc cleanupTask;
94
95     @SuppressWarnings JavaDoc("unchecked")
96     public synchronized void start() throws IOException JavaDoc {
97         if( started ) {
98             return;
99         }
100
101         
102         started=true;
103         directory.mkdirs();
104         controlFile = new ControlFile(new File JavaDoc(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH);
105         controlFile.lock();
106         
107         ByteSequence sequence = controlFile.load();
108         if( sequence != null && sequence.getLength()>0 ) {
109             unmarshallState(sequence);
110         }
111         if( useNio) {
112             appender = new NIODataFileAppender(this);
113         } else {
114             appender = new DataFileAppender(this);
115         }
116
117         File JavaDoc[] files=directory.listFiles(new FilenameFilter JavaDoc(){
118             public boolean accept(File JavaDoc dir,String JavaDoc n){
119                 return dir.equals(dir)&&n.startsWith(filePrefix);
120             }
121         });
122         
123         if(files!=null){
124             for(int i=0;i<files.length;i++){
125                 try {
126                     File JavaDoc file=files[i];
127                     String JavaDoc n=file.getName();
128                     String JavaDoc numStr=n.substring(filePrefix.length(),n.length());
129                     int num=Integer.parseInt(numStr);
130                     DataFile dataFile=new DataFile(file,num, preferedFileLength);
131                     fileMap.put(dataFile.getDataFileId(),dataFile);
132                 } catch (NumberFormatException JavaDoc e) {
133                     // Ignore file that do not match the patern.
134
}
135             }
136             
137             // Sort the list so that we can link the DataFiles together in the right order.
138
ArrayList JavaDoc<DataFile> l = new ArrayList JavaDoc<DataFile>(fileMap.values());
139             Collections.sort(l);
140             currentWriteFile=null;
141             for (DataFile df : l) {
142                 if( currentWriteFile!=null ) {
143                     currentWriteFile.linkAfter(df);
144                 }
145                 currentWriteFile=df;
146             }
147         }
148         
149         // Need to check the current Write File to see if there was a partial write to it.
150
if( currentWriteFile!=null ) {
151             
152             // See if the lastSyncedLocation is valid..
153
Location l = lastAppendLocation.get();
154             if( l!=null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue() ) {
155                 l=null;
156             }
157             
158             // If we know the last location that was ok.. then we can skip lots of checking
159
l = recoveryCheck(currentWriteFile, l);
160             lastAppendLocation.set(l);
161         }
162         
163         storeState(false);
164         
165         cleanupTask = new Runnable JavaDoc(){
166             public void run() {
167                 cleanup();
168             }};
169         Scheduler.executePeriodically(cleanupTask, 1000*30);
170     }
171     
172     private Location recoveryCheck(DataFile dataFile, Location location) throws IOException JavaDoc {
173         if( location == null ) {
174             location = new Location();
175             location.setDataFileId(dataFile.getDataFileId());
176             location.setOffset(0);
177         }
178         DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
179         try {
180             reader.readLocationDetails(location);
181             while( reader.readLocationDetailsAndValidate(location) ) {
182                 location.setOffset(location.getOffset()+location.getSize());
183             }
184         } finally {
185             accessorPool.closeDataFileAccessor(reader);
186         }
187         dataFile.setLength(location.getOffset());
188         return location;
189     }
190
191     private void unmarshallState(ByteSequence sequence) throws IOException JavaDoc {
192         ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(sequence.getData(), sequence.getOffset(), sequence.getLength());
193         DataInputStream JavaDoc dis = new DataInputStream JavaDoc(bais);
194         if( dis.readBoolean() ) {
195             mark = new Location();
196             mark.readExternal(dis);
197         } else {
198             mark = null;
199         }
200         if( dis.readBoolean() ) {
201             Location l = new Location();
202             l.readExternal(dis);
203             lastAppendLocation.set(l);
204         } else {
205             lastAppendLocation.set(null);
206         }
207     }
208     
209     private synchronized ByteSequence marshallState() throws IOException JavaDoc {
210         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
211         DataOutputStream JavaDoc dos = new DataOutputStream JavaDoc(baos);
212
213         if( mark!=null ) {
214             dos.writeBoolean(true);
215             mark.writeExternal(dos);
216         } else {
217             dos.writeBoolean(false);
218         }
219         Location l = lastAppendLocation.get();
220         if( l!=null ) {
221             dos.writeBoolean(true);
222             l.writeExternal(dos);
223         } else {
224             dos.writeBoolean(false);
225         }
226         
227         byte[] bs = baos.toByteArray();
228         return new ByteSequence(bs,0,bs.length);
229     }
230
231     synchronized DataFile allocateLocation(Location location) throws IOException JavaDoc{
232         if(currentWriteFile==null||((currentWriteFile.getLength()+location.getSize())>maxFileLength)){
233             int nextNum=currentWriteFile!=null?currentWriteFile.getDataFileId().intValue()+1:1;
234
235             String JavaDoc fileName=filePrefix+nextNum;
236             DataFile nextWriteFile=new DataFile(new File JavaDoc(directory,fileName),nextNum, preferedFileLength);
237             fileMap.put(nextWriteFile.getDataFileId(),nextWriteFile);
238             if( currentWriteFile!=null ) {
239                 currentWriteFile.linkAfter(nextWriteFile);
240                 if(currentWriteFile.isUnused()){
241                     removeDataFile(currentWriteFile);
242                 }
243             }
244             currentWriteFile=nextWriteFile;
245                         
246         }
247         location.setOffset(currentWriteFile.getLength());
248         location.setDataFileId(currentWriteFile.getDataFileId().intValue());
249         currentWriteFile.incrementLength(location.getSize());
250         currentWriteFile.increment();
251         return currentWriteFile;
252     }
253
254     DataFile getDataFile(Location item) throws IOException JavaDoc{
255         Integer JavaDoc key=new Integer JavaDoc(item.getDataFileId());
256         DataFile dataFile=(DataFile) fileMap.get(key);
257         if(dataFile==null){
258             log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
259             throw new IOException JavaDoc("Could not locate data file "+filePrefix+"-"+item.getDataFileId());
260         }
261         return dataFile;
262     }
263     
264     private DataFile getNextDataFile(DataFile dataFile) {
265         return (DataFile) dataFile.getNext();
266     }
267
268     public void close() throws IOException JavaDoc{
269         synchronized(this){
270             if(!started){
271                 return;
272             }
273             Scheduler.cancel(cleanupTask);
274             accessorPool.close();
275         }
276         storeState(false);
277         appender.close();
278         fileMap.clear();
279         controlFile.unlock();
280         controlFile.dispose();
281         started=false;
282     }
283
284     private synchronized void cleanup() {
285         if( accessorPool!=null ) {
286             accessorPool.disposeUnused();
287         }
288     }
289     public synchronized boolean delete() throws IOException JavaDoc{
290         
291         // Close all open file handles...
292
appender.close();
293         accessorPool.close();
294         
295         boolean result=true;
296         for(Iterator JavaDoc i=fileMap.values().iterator();i.hasNext();){
297             DataFile dataFile=(DataFile) i.next();
298             result&=dataFile.delete();
299         }
300         fileMap.clear();
301         lastAppendLocation.set(null);
302         mark=null;
303         currentWriteFile=null;
304         
305         // reopen open file handles...
306
accessorPool = new DataFileAccessorPool(this);
307         if( useNio) {
308             appender = new NIODataFileAppender(this);
309         } else {
310             appender = new DataFileAppender(this);
311         }
312         return result;
313     }
314     
315     public synchronized void addInterestInFile(int file) throws IOException JavaDoc{
316         if(file>=0){
317             Integer JavaDoc key=new Integer JavaDoc(file);
318             DataFile dataFile=(DataFile) fileMap.get(key);
319             if(dataFile==null){
320                 throw new IOException JavaDoc("That data file does not exist");
321             }
322             addInterestInFile(dataFile);
323         }
324     }
325
326     synchronized void addInterestInFile(DataFile dataFile){
327         if(dataFile!=null){
328             dataFile.increment();
329         }
330     }
331
332     public synchronized void removeInterestInFile(int file) throws IOException JavaDoc{
333         if(file>=0){
334             Integer JavaDoc key=new Integer JavaDoc(file);
335             DataFile dataFile=(DataFile) fileMap.get(key);
336             removeInterestInFile(dataFile);
337         }
338     }
339
340     synchronized void removeInterestInFile(DataFile dataFile) throws IOException JavaDoc{
341         if(dataFile!=null){
342             if(dataFile.decrement()<=0){
343                 removeDataFile(dataFile);
344             }
345         }
346     }
347     
348     
349     synchronized public void consolidateDataFilesNotIn(Set JavaDoc<Integer JavaDoc> inUse) throws IOException JavaDoc {
350         
351         // Substract and the difference is the set of files that are no longer needed :)
352
Set JavaDoc<Integer JavaDoc> unUsed = new HashSet JavaDoc<Integer JavaDoc>(fileMap.keySet());
353         unUsed.removeAll(inUse);
354         
355         List JavaDoc<DataFile> purgeList=new ArrayList JavaDoc<DataFile>();
356         for (Integer JavaDoc key : unUsed) {
357             DataFile dataFile=(DataFile) fileMap.get(key);
358             purgeList.add(dataFile);
359         }
360         
361         for (DataFile dataFile : purgeList) {
362             removeDataFile(dataFile);
363         }
364     }
365
366     public synchronized void consolidateDataFiles() throws IOException JavaDoc{
367         List JavaDoc<DataFile> purgeList=new ArrayList JavaDoc<DataFile>();
368         for (DataFile dataFile : fileMap.values()) {
369             if( dataFile.isUnused() ){
370                 purgeList.add(dataFile);
371             }
372         }
373         for (DataFile dataFile : purgeList) {
374             removeDataFile(dataFile);
375         }
376     }
377
378     private void removeDataFile(DataFile dataFile) throws IOException JavaDoc{
379
380         // Make sure we don't delete too much data.
381
if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) {
382             return;
383         }
384
385         accessorPool.disposeDataFileAccessors(dataFile);
386         
387         fileMap.remove(dataFile.getDataFileId());
388         dataFile.unlink();
389         boolean result=dataFile.delete();
390         log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
391         
392     }
393         
394
395     /**
396      * @return the maxFileLength
397      */

398     public int getMaxFileLength(){
399         return maxFileLength;
400     }
401
402     /**
403      * @param maxFileLength the maxFileLength to set
404      */

405     public void setMaxFileLength(int maxFileLength){
406         this.maxFileLength=maxFileLength;
407     }
408     
409     public String JavaDoc toString(){
410         return "DataManager:("+filePrefix+")";
411     }
412
413     public synchronized Location getMark() throws IllegalStateException JavaDoc {
414         return mark;
415     }
416
417     public Location getNextLocation(Location location) throws IOException JavaDoc, IllegalStateException JavaDoc {
418             
419             
420             Location cur = null;
421             while( true ) {
422                 if( cur == null ) {
423                     if( location == null ) {
424                         DataFile head = (DataFile) currentWriteFile.getHeadNode();
425                         cur = new Location();
426                         cur.setDataFileId(head.getDataFileId());
427                         cur.setOffset(0);
428                         
429 // DataFileAccessor reader = accessorPool.openDataFileAccessor(head);
430
// try {
431
// if( !reader.readLocationDetailsAndValidate(cur) ) {
432
// return null;
433
// }
434
// } finally {
435
// accessorPool.closeDataFileAccessor(reader);
436
// }
437
} else {
438                         // Set to the next offset..
439
cur = new Location(location);
440                         cur.setOffset(cur.getOffset()+cur.getSize());
441                     }
442                 } else {
443                     cur.setOffset(cur.getOffset()+cur.getSize());
444                 }
445                 
446                 DataFile dataFile = getDataFile(cur);
447                 
448                 // Did it go into the next file??
449
if( dataFile.getLength() <= cur.getOffset() ) {
450                     dataFile = getNextDataFile(dataFile);
451                     if( dataFile == null ) {
452                         return null;
453                     } else {
454                         cur.setDataFileId(dataFile.getDataFileId().intValue());
455                         cur.setOffset(0);
456                     }
457                 }
458                 
459                 // Load in location size and type.
460
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
461                 try {
462                     reader.readLocationDetails(cur);
463                 } finally {
464                     accessorPool.closeDataFileAccessor(reader);
465                 }
466                 
467                 if( cur.getType() == 0 ) {
468                     return null;
469                 } else if( cur.getType() > 0 ) {
470                     // Only return user records.
471
return cur;
472                 }
473             }
474     }
475
476     public ByteSequence read(Location location) throws IOException JavaDoc, IllegalStateException JavaDoc {
477         DataFile dataFile = getDataFile(location);
478         DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
479         ByteSequence rc=null;
480         try {
481             rc = reader.readRecord(location);
482         } finally {
483             accessorPool.closeDataFileAccessor(reader);
484         }
485         return rc;
486     }
487
488     public void setMark(Location location, boolean sync) throws IOException JavaDoc, IllegalStateException JavaDoc {
489         synchronized(this) {
490             mark = location;
491         }
492         storeState(sync);
493     }
494
495     private void storeState(boolean sync) throws IOException JavaDoc {
496         ByteSequence state = marshallState();
497         appender.storeItem(state, Location.MARK_TYPE, sync);
498         controlFile.store(state, sync);
499     }
500
501     public Location write(ByteSequence data, boolean sync) throws IOException JavaDoc, IllegalStateException JavaDoc {
502         return appender.storeItem(data, Location.USER_TYPE, sync);
503     }
504     
505     public Location write(ByteSequence data, byte type, boolean sync) throws IOException JavaDoc, IllegalStateException JavaDoc {
506         return appender.storeItem(data, type, sync);
507     }
508
509     public void update(Location location, ByteSequence data, boolean sync) throws IOException JavaDoc {
510         DataFile dataFile = getDataFile(location);
511         DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
512         try {
513             updater.updateRecord(location, data, sync);
514         } finally {
515             accessorPool.closeDataFileAccessor(updater);
516         }
517     }
518
519     public File JavaDoc getDirectory() {
520         return directory;
521     }
522
523     public void setDirectory(File JavaDoc directory) {
524         this.directory = directory;
525     }
526
527     public String JavaDoc getFilePrefix() {
528         return filePrefix;
529     }
530
531     public void setFilePrefix(String JavaDoc filePrefix) {
532         this.filePrefix = filePrefix;
533     }
534
535     public ConcurrentHashMap JavaDoc<WriteKey, WriteCommand> getInflightWrites() {
536         return inflightWrites;
537     }
538
539     public Location getLastAppendLocation() {
540         return lastAppendLocation.get();
541     }
542
543     public void setLastAppendLocation(Location lastSyncedLocation) {
544         this.lastAppendLocation.set(lastSyncedLocation);
545     }
546
547 }
548
Popular Tags