1 29 30 package com.caucho.db.store; 31 32 import com.caucho.log.Log; 33 import com.caucho.management.server.AbstractManagedObject; 34 import com.caucho.management.server.BlockManagerMXBean; 35 import com.caucho.util.L10N; 36 import com.caucho.util.LongKeyLruCache; 37 38 import java.io.IOException ; 39 import java.util.ArrayList ; 40 import java.util.Iterator ; 41 import java.util.logging.Level ; 42 import java.util.logging.Logger ; 43 44 47 public final class BlockManager 48 extends AbstractManagedObject 49 implements BlockManagerMXBean 50 { 51 private static final Logger log = Log.open(BlockManager.class); 52 private static final L10N L = new L10N(BlockManager.class); 53 54 private static BlockManager _staticManager; 55 56 private final byte []_storeMask = new byte[8192]; 57 private final LongKeyLruCache<Block> _blockCache; 58 59 private final ArrayList <Block> _writeQueue = new ArrayList <Block>(); 60 private int _writeQueueMax = 32; 61 62 private BlockManager(int capacity) 63 { 64 _blockCache = new LongKeyLruCache<Block>(capacity); 65 66 _storeMask[0] |= 1; 68 69 registerSelf(); 70 71 BlockManagerWriter writer = new BlockManagerWriter(); 72 Thread thread = new Thread (writer, "block-manager-writer"); 73 thread.setDaemon(true); 74 75 thread.start(); 76 } 77 78 81 public static synchronized BlockManager create(int minEntries) 82 { 83 if (_staticManager == null) 84 _staticManager = new BlockManager(minEntries); 85 86 _staticManager.ensureCapacity(minEntries); 87 88 return _staticManager; 89 } 90 91 public static BlockManager getBlockManager() 92 { 93 return _staticManager; 94 } 95 96 101 public void ensureCapacity(int minCapacity) 102 { 103 _blockCache.ensureCapacity(minCapacity); 104 } 105 106 109 public int allocateStoreId() 110 { 111 synchronized (_storeMask) { 112 for (int i = 0; i < _storeMask.length; i++) { 113 int mask = _storeMask[i]; 114 115 if (mask != 0xff) { 116 for (int j = 0; j < 8; j++) { 117 if ((mask & (1 << j)) == 0) { 118 _storeMask[i] |= (1 << j); 119 120 return 8 * i + j; 121 } 122 } 123 } 124 } 125 126 throw new IllegalStateException (L.l("All store ids used.")); 127 } 128 } 129 130 133 public void flush(Store store) 134 { 135 ArrayList <Block> dirtyBlocks = null; 136 137 synchronized (_blockCache) { 138 Iterator <Block> values = _blockCache.values(); 139 140 while (values.hasNext()) { 141 Block block = values.next(); 142 143 if (block != null && block.getStore() == store) { 144 if (block.isDirty()) { 145 if (dirtyBlocks == null) 146 dirtyBlocks = new ArrayList <Block>(); 147 148 dirtyBlocks.add(block); 149 } 150 } 151 } 152 } 153 154 for (int i = 0; dirtyBlocks != null && i < dirtyBlocks.size(); i++) { 155 Block block = dirtyBlocks.get(i); 156 157 try { 158 synchronized (block) { 159 block.write(); 160 } 161 } catch (IOException e) { 162 log.log(Level.FINER, e.toString(), e); 163 } 164 } 165 } 166 167 170 public void freeStore(Store store) 171 { 172 ArrayList <Block> removeBlocks = new ArrayList <Block>(); 173 174 synchronized (_blockCache) { 175 Iterator <Block> iter = _blockCache.values(); 176 177 while (iter.hasNext()) { 178 Block block = iter.next(); 179 180 if (block != null && block.getStore() == store) 181 removeBlocks.add(block); 182 } 183 } 184 185 for (Block block : removeBlocks) { 186 _blockCache.remove(block.getBlockId()); 187 } 188 189 synchronized (_writeQueue) { 190 while (_writeQueue.size() > 0) { 191 try { 192 _writeQueue.wait(); 193 } catch (InterruptedException e) { 194 } 195 } 196 } 197 } 198 199 202 public void freeStoreId(int storeId) 203 { 204 synchronized (_storeMask) { 205 if (storeId <= 0) 206 throw new IllegalArgumentException (String.valueOf(storeId)); 207 208 _storeMask[storeId / 8] &= ~(1 << storeId % 8); 209 } 210 } 211 212 215 Block getBlock(Store store, long blockId) 216 { 217 220 Block block = _blockCache.get(blockId); 221 222 while (block == null || ! block.allocate()) { 223 Block dirtyBlock = null; 225 synchronized (_writeQueue) { 226 int size = _writeQueue.size(); 227 228 for (int i = 0; i < size; i++) { 229 dirtyBlock = _writeQueue.get(i); 230 231 if (dirtyBlock.getBlockId() == blockId && dirtyBlock.allocate()) { 232 break; 233 } 234 else 235 dirtyBlock = null; 236 } 237 } 238 239 if ((blockId & Store.BLOCK_MASK) == 0) 240 throw stateError(L.l("Block 0 is reserved.")); 241 242 block = new ReadBlock(store, blockId); 243 244 if (dirtyBlock != null) { 245 byte []dirtyBuffer = dirtyBlock.getBuffer(); 246 247 if (dirtyBuffer != null) { 248 System.arraycopy(dirtyBuffer, 0, block.getBuffer(), 0, dirtyBuffer.length); 249 block.validate(); 250 } 251 252 dirtyBlock.free(); 253 } 254 255 block = _blockCache.putIfNew(blockId, block); 258 } 259 260 return block; 261 } 262 263 266 void addLruDirtyWriteBlock(Block block) 267 { 268 synchronized (_writeQueue) { 269 while (_writeQueueMax < _writeQueue.size()) { 270 try { 271 _writeQueue.wait(); 272 } catch (InterruptedException e) { 273 } 274 } 275 276 _writeQueue.add(block); 277 278 _writeQueue.notifyAll(); 279 } 280 } 281 282 286 289 public String getName() 290 { 291 return null; 292 } 293 294 297 public String getType() 298 { 299 return "BlockManager"; 300 } 301 302 305 public long getBlockCapacity() 306 { 307 return _blockCache.getCapacity(); 308 } 309 310 313 public long getHitCountTotal() 314 { 315 return _blockCache.getHitCount(); 316 } 317 318 321 public long getMissCountTotal() 322 { 323 return _blockCache.getMissCount(); 324 } 325 326 private static IllegalStateException stateError(String msg) 327 { 328 IllegalStateException e = new IllegalStateException (msg); 329 e.fillInStackTrace(); 330 log.log(Level.WARNING, e.toString(), e); 331 return e; 332 } 333 334 class BlockManagerWriter implements Runnable { 335 public void run() 336 { 337 while (true) { 338 try { 339 Block block = null; 340 341 synchronized (_writeQueue) { 342 loop: 343 while (true) { 344 for (int i = 0; i < _writeQueue.size(); i++) { 345 block = _writeQueue.get(i); 346 347 if (block.isFree()) 348 break loop; 349 else 350 block = null; 351 } 352 353 if (_writeQueue.size() == 0) 354 _writeQueue.wait(); 355 else 356 _writeQueue.wait(10000); 357 } 358 } 359 360 block.close(); 361 362 synchronized (_writeQueue) { 363 for (int i = 0; i < _writeQueue.size(); i++) { 364 if (block == _writeQueue.get(i)) { 365 _writeQueue.remove(i); 366 break; 367 } 368 } 369 370 _writeQueue.notifyAll(); 371 } 372 } catch (InterruptedException e) { 373 } catch (Throwable e) { 374 log.log(Level.WARNING, e.toString(), e); 375 } 376 } 377 } 378 } 379 } 380 | Popular Tags |