1 18 package org.apache.activemq.kaha.impl.async; 19 20 import java.io.IOException ; 21 import java.util.ArrayList ; 22 import java.util.HashMap ; 23 import java.util.Iterator ; 24 25 30 public class DataFileAccessorPool { 31 32 private final AsyncDataManager dataManager; 33 private final HashMap <Integer , Pool> pools = new HashMap <Integer , Pool>(); 34 private boolean closed=false; 35 36 int MAX_OPEN_READERS_PER_FILE=5; 37 38 class Pool { 39 40 private final DataFile file; 41 private final ArrayList <DataFileAccessor> pool = new ArrayList <DataFileAccessor>(); 42 private boolean used; 43 private int openCounter; 44 private boolean disposed; 45 46 public Pool(DataFile file) { 47 this.file = file; 48 } 49 50 public DataFileAccessor openDataFileReader() throws IOException { 51 DataFileAccessor rc=null; 52 if( pool.isEmpty() ) { 53 rc = new DataFileAccessor(dataManager, file); 54 } else { 55 rc = (DataFileAccessor) pool.remove(pool.size()-1); 56 } 57 used=true; 58 openCounter++; 59 return rc; 60 } 61 62 public void closeDataFileReader(DataFileAccessor reader) { 63 openCounter--; 64 used=true; 65 if(pool.size() >= MAX_OPEN_READERS_PER_FILE || disposed) { 66 reader.dispose(); 67 } else { 68 pool.add(reader); 69 } 70 } 71 72 public void clearUsedMark() { 73 used=false; 74 } 75 76 public boolean isUsed() { 77 return used; 78 } 79 80 public void dispose() { 81 for (DataFileAccessor reader : pool) { 82 reader.dispose(); 83 } 84 pool.clear(); 85 disposed=true; 86 } 87 88 public int getOpenCounter() { 89 return openCounter; 90 } 91 92 } 93 94 public DataFileAccessorPool(AsyncDataManager dataManager){ 95 this.dataManager=dataManager; 96 } 97 98 synchronized void clearUsedMark() { 99 for (Iterator iter = pools.values().iterator(); iter.hasNext();) { 100 Pool pool = (Pool) iter.next(); 101 pool.clearUsedMark(); 102 } 103 } 104 105 synchronized void disposeUnused() { 106 for (Iterator <Pool> iter = pools.values().iterator(); iter.hasNext();) { 107 Pool pool = iter.next(); 108 if( !pool.isUsed() ) { 109 pool.dispose(); 110 iter.remove(); 111 } 112 } 113 } 114 115 synchronized void disposeDataFileAccessors(DataFile dataFile) { 116 if( closed ) { 117 throw new IllegalStateException ("Closed."); 118 } 119 Pool pool = pools.get(dataFile.getDataFileId()); 120 if( pool != null ) { 121 if( pool.getOpenCounter()==0 ) { 122 pool.dispose(); 123 pools.remove(dataFile.getDataFileId()); 124 } else { 125 throw new IllegalStateException ("The data file is still in use: "+dataFile+", use count: "+pool.getOpenCounter()); 126 } 127 } 128 } 129 130 synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException { 131 if( closed ) { 132 throw new IOException ("Closed."); 133 } 134 135 Pool pool = pools.get(dataFile.getDataFileId()); 136 if( pool == null ) { 137 pool = new Pool(dataFile); 138 pools.put(dataFile.getDataFileId(), pool); 139 } 140 return pool.openDataFileReader(); 141 } 142 143 synchronized void closeDataFileAccessor(DataFileAccessor reader) { 144 Pool pool = pools.get(reader.getDataFile().getDataFileId()); 145 if( pool == null || closed ) { 146 reader.dispose(); 147 } else { 148 pool.closeDataFileReader(reader); 149 } 150 } 151 152 synchronized public void close() { 153 if(closed) 154 return; 155 closed=true; 156 for (Iterator <Pool> iter = pools.values().iterator(); iter.hasNext();) { 157 Pool pool = iter.next(); 158 pool.dispose(); 159 } 160 pools.clear(); 161 } 162 163 } 164 | Popular Tags |