1 22 package org.jboss.ejb.plugins.cmp.jdbc2.schema; 23 24 import org.jboss.system.ServiceMBeanSupport; 25 26 import javax.transaction.Transaction ; 27 28 33 public class PartitionedTableCache 34 extends ServiceMBeanSupport 35 implements Cache, PartitionedTableCacheMBean 36 { 37 private Cache.Listener listener = Cache.Listener.NOOP; 38 39 private final int minCapacity; 40 private final int minPartitionCapacity; 41 private int maxCapacity; 42 private int maxPartitionCapacity; 43 44 private final TableCache[] partitions; 45 46 private Overager overager; 47 48 public PartitionedTableCache(int minCapacity, int maxCapacity, int partitionsTotal) 49 { 50 this.minCapacity = minCapacity; 51 this.maxCapacity = maxCapacity; 52 53 minPartitionCapacity = minCapacity / partitionsTotal + 1; 54 maxPartitionCapacity = maxCapacity / partitionsTotal + 1; 55 partitions = new TableCache[partitionsTotal]; 56 for(int i = 0; i < partitions.length; ++i) 57 { 58 partitions[i] = new TableCache(i, minPartitionCapacity, maxPartitionCapacity); 59 } 60 61 if(log.isTraceEnabled()) 62 { 63 log.trace("min-capacity=" + minCapacity + ", max-capacity=" + maxCapacity + ", partitions=" + partitionsTotal); 64 } 65 } 66 67 public void stopService() 68 { 69 if(overager != null) 70 { 71 overager.stop(); 72 } 73 } 74 75 public void initOverager(long period, long maxAge, String threadName) 76 { 77 final long periodMs = period * 1000; 78 final long maxAgeMs = maxAge * 1000; 79 overager = new Overager(maxAgeMs, periodMs); 80 new Thread (overager, threadName).start(); 81 } 82 83 86 public void registerListener(Cache.Listener listener) 87 { 88 if(log.isTraceEnabled()) 89 { 90 log.trace("registered listener for " + getServiceName()); 91 } 92 93 this.listener = listener; 94 for(int i = 0; i < partitions.length; ++i) 95 { 96 partitions[i].registerListener(listener); 97 } 98 } 99 100 103 public int size() 104 { 105 int size = 0; 106 for(int i = 0; i < partitions.length; ++i) 107 { 108 size += partitions[i].size(); 109 } 110 return size; 111 } 112 113 116 public int getMaxCapacity() 117 { 118 return maxCapacity; 119 } 120 121 124 public void setMaxCapacity(int maxCapacity) 125 { 126 this.maxCapacity = maxCapacity; 127 this.maxPartitionCapacity = maxCapacity / partitions.length + 1; 128 for(int i = 0; i < partitions.length; ++i) 129 { 130 partitions[i].setMaxCapacity(maxPartitionCapacity); 131 } 132 } 133 134 137 public int getMinCapacity() 138 { 139 return minCapacity; 140 } 141 142 145 public int getPartitionsTotal() 146 { 147 return partitions.length; 148 } 149 150 153 public int getMinPartitionCapacity() 154 { 155 return minPartitionCapacity; 156 } 157 158 161 public int getMaxPartitionCapacity() 162 { 163 return maxPartitionCapacity; 164 } 165 166 public void lock() 167 { 168 } 169 170 public void lock(Object key) 171 { 172 int partitionIndex = getPartitionIndex(key); 173 partitions[partitionIndex].lock(key); 174 } 175 176 public void unlock() 177 { 178 } 179 180 public void unlock(Object key) 181 { 182 int partitionIndex = getPartitionIndex(key); 183 partitions[partitionIndex].unlock(key); 184 } 185 186 public Object [] getFields(Object pk) 187 { 188 final int i = getPartitionIndex(pk); 189 return partitions[i].getFields(pk); 190 } 191 192 public Object [] getRelations(Object pk) 193 { 194 final int i = getPartitionIndex(pk); 195 return partitions[i].getRelations(pk); 196 } 197 198 public void put(Transaction tx, Object pk, Object [] fields, Object [] relations) 199 { 200 final int i = getPartitionIndex(pk); 201 partitions[i].put(tx, pk, fields, relations); 202 } 203 204 public void remove(Transaction tx, Object pk) 205 { 206 final int i = getPartitionIndex(pk); 207 partitions[i].remove(tx, pk); 208 } 209 210 public boolean contains(Transaction tx, Object pk) 211 { 212 final int i = getPartitionIndex(pk); 213 return partitions[i].contains(tx, pk); 214 } 215 216 public void lockForUpdate(Transaction tx, Object pk) throws Exception 217 { 218 final int i = getPartitionIndex(pk); 219 partitions[i].lockForUpdate(tx, pk); 220 } 221 222 public void releaseLock(Transaction tx, Object pk) throws Exception 223 { 224 final int i = getPartitionIndex(pk); 225 partitions[i].releaseLock(tx, pk); 226 } 227 228 public void flush() 229 { 230 for(int i = 0; i < partitions.length; ++i) 231 { 232 final TableCache partition = partitions[i]; 233 partition.lock(); 234 try 235 { 236 partition.flush(); 237 } 238 finally 239 { 240 partition.unlock(); 241 } 242 } 243 } 244 245 247 private int getPartitionIndex(Object key) 248 { 249 return Math.abs(key.hashCode()) % partitions.length; 250 } 251 252 254 private class Overager implements Runnable 255 { 256 private final long maxAgeMs; 257 private final long periodMs; 258 private boolean run = true; 259 260 public Overager(long maxAgeMs, long periodMs) 261 { 262 this.maxAgeMs = maxAgeMs; 263 this.periodMs = periodMs; 264 } 265 266 public void stop() 267 { 268 run = false; 269 } 270 271 public void run() 272 { 273 while(run) 274 { 275 long lastUpdated = System.currentTimeMillis() - maxAgeMs; 276 for(int i = 0; i < partitions.length; ++i) 277 { 278 partitions[i].ageOut(lastUpdated); 279 } 280 281 try 282 { 283 Thread.sleep(periodMs); 284 } 285 catch(InterruptedException e) 286 { 287 } 288 } 289 } 290 } 291 } 292 | Popular Tags |