1 2 12 package com.versant.core.metric; 13 14 import com.versant.core.common.Debug; 15 16 import java.util.*; 17 18 import com.versant.core.common.BindingSupportImpl; 19 20 26 public final class MetricSnapshotStore implements Runnable { 27 28 private HasMetrics[] sources; 29 private BaseMetric[] baseMetrics; 30 private List otherMetrics = new ArrayList(); 31 private Metric[] all; 32 private int capacity; 33 private Date[] dates; private int[] ids; private int[][] buf; 36 private int pos; private int count; 38 private boolean locked; 39 private int lastID; 40 41 private Thread snapshotThread; 42 private boolean run; 43 private int sampleIntervalMs; 44 private long nextSampleTime; 45 46 public MetricSnapshotStore(int capacity, int sampleIntervalMs) { 47 this.capacity = capacity; 48 this.sampleIntervalMs = sampleIntervalMs; 49 dates = new Date[capacity]; 50 ids = new int[capacity]; 51 } 52 53 56 public void addSource(HasMetrics source) { 57 lock(); 58 try { 59 if (sources == null) { 60 sources = new HasMetrics[]{source}; 61 } else { 62 HasMetrics[] a = new HasMetrics[sources.length + 1]; 63 System.arraycopy(sources, 0, a, 0, sources.length); 64 a[sources.length] = source; 65 sources = a; 66 } 67 68 ArrayList list = new ArrayList(); 69 source.addMetrics(list); 70 71 int n = list.size(); 73 ArrayList base = new ArrayList(n); 74 for (int i = 0; i < n; i++) { 75 Object o = list.get(i); 76 if (o instanceof BaseMetric) { 77 base.add(o); 78 } 79 } 80 Collections.sort(base); 81 int firstNewBase; 82 int baseSize = base.size(); 83 if (baseMetrics == null || baseMetrics.length == 0) { 84 firstNewBase = 0; 85 baseMetrics = new BaseMetric[baseSize]; 86 base.toArray(baseMetrics); 87 } else { 88 firstNewBase = baseMetrics.length; 89 BaseMetric[] a = new BaseMetric[firstNewBase + baseSize]; 90 System.arraycopy(baseMetrics, 0, a, 0, firstNewBase); 91 for (int i = 0; i < baseSize; i++) { 92 a[firstNewBase + i] = (BaseMetric)base.get(i); 93 } 94 baseMetrics = a; 95 } 96 for (int i = firstNewBase; i < baseMetrics.length; i++) { 97 baseMetrics[i].setIndex(i); 98 } 99 100 if (buf == null) { 102 buf = new int[baseMetrics.length][]; 103 } else { 104 int[][] a = new int[baseMetrics.length][]; 105 System.arraycopy(buf, 0, a, 0, firstNewBase); 106 buf = a; 107 } 108 for (int i = firstNewBase; i < buf.length; i++) { 109 buf[i] = new int[capacity]; 110 } 111 112 ArrayList other = new ArrayList(); 114 for (int i = 0; i < n; i++) { 115 Object o = list.get(i); 116 if (!(o instanceof BaseMetric)) { 117 other.add(o); 118 } 119 } 120 Collections.sort(other); 121 otherMetrics.addAll(other); 122 int otherSize = otherMetrics.size(); 123 all = new Metric[baseMetrics.length + otherSize]; 124 System.arraycopy(baseMetrics, 0, all, 0, baseMetrics.length); 125 for (int i = 0; i < otherSize; i++) { 126 all[baseMetrics.length + i] = (Metric)otherMetrics.get(i); 127 } 128 } finally { 129 unlock(); 130 } 131 } 132 133 136 public void start(String id) { 137 if (snapshotThread == null) { 138 run = true; 139 snapshotThread = new Thread (this, "VOA Metric Store " + id); 140 snapshotThread.setDaemon(true); 141 snapshotThread.start(); 142 } 143 } 144 145 153 private int beginUpdate() { 154 lock(); 155 ids[pos] = ++lastID; 156 dates[pos] = new Date(); 157 return pos; 158 } 159 160 164 private void endUpdate() { 165 pos = (pos + 1) % capacity; 166 if (count < capacity) count++; 167 unlock(); 168 } 169 170 173 private synchronized void lock() { 174 for (; locked; ) { 175 try { 176 wait(); 177 } catch (InterruptedException e) { 178 } 180 } 181 locked = true; 182 } 183 184 187 private synchronized void unlock() { 188 if (Debug.DEBUG) { 189 if (!locked) { 190 throw BindingSupportImpl.getInstance().internal( 191 "unlock() called with locked == false"); 192 } 193 } 194 locked = false; 195 notify(); 196 } 197 198 public int getCapacity() { 199 return capacity; 200 } 201 202 205 public void setCapacity(int max) { 206 lock(); 207 try { 208 MetricSnapshotPacket old = getNewSnapshotsImp(0); 209 capacity = max; 210 buf = new int[baseMetrics.length][]; 211 for (int i = 0; i < baseMetrics.length; i++) buf[i] = new int[capacity]; 212 dates = new Date[capacity]; 213 ids = new int[capacity]; 214 if (old != null) { 215 int n = old.getSize(); 216 if (n > max) n = max; 217 int first = old.getSize() - n; 218 for (int i = 0; i < baseMetrics.length; i++) { 219 System.arraycopy(old.getBuf()[i], first, buf[i], 0, n); 220 } 221 System.arraycopy(old.getDates(), first, dates, 0, n); 222 System.arraycopy(old.getIds(), first, ids, 0, n); 223 count = n; 224 pos = n % capacity; 225 } else { 226 count = 0; 227 pos = 0; 228 } 229 } finally { 230 unlock(); 231 } 232 } 233 234 238 public MetricSnapshotPacket getNewSnapshots(int id) { 239 lock(); 240 try { 241 return getNewSnapshotsImp(id); 242 } finally { 243 unlock(); 244 } 245 } 246 247 private MetricSnapshotPacket getNewSnapshotsImp(int id) { 248 if (count < capacity) { 249 int first; 250 for (first = pos - 1; first >= 0; first--) { 251 if (ids[first] == id) break; 252 } 253 first++; 254 int n = pos - first; 255 if (n == 0) return null; 256 int[][] sbuf = new int[baseMetrics.length][]; 257 for (int i = 0; i < baseMetrics.length; i++) { 258 sbuf[i] = new int[n]; 259 System.arraycopy(buf[i], first, sbuf[i], 0, n); 260 } 261 Date[] dbuf = new Date[n]; 262 System.arraycopy(dates, first, dbuf, 0, n); 263 int[] ibuf = new int[n]; 264 System.arraycopy(ids, first, ibuf, 0, n); 265 return new MetricSnapshotPacket(dbuf, ibuf, sbuf); 266 } else { 267 if (ids[(pos + capacity - 1) % capacity] == id) return null; 268 int first = pos; 269 int c = capacity; 270 for (; c > 0; first = (first + 1) % capacity, c--) { 271 if (ids[first] == id) break; 272 } 273 first = (first + 1) % capacity; 274 if (first >= pos) { 275 int h1 = capacity - first; 276 int h2 = pos; 277 int n = h1 + h2; 278 int[][] sbuf = new int[baseMetrics.length][]; 279 for (int i = 0; i < baseMetrics.length; i++) { 280 sbuf[i] = new int[n]; 281 System.arraycopy(buf[i], first, sbuf[i], 0, h1); 282 System.arraycopy(buf[i], 0, sbuf[i], h1, h2); 283 } 284 Date[] dbuf = new Date[n]; 285 System.arraycopy(dates, first, dbuf, 0, h1); 286 System.arraycopy(dates, 0, dbuf, h1, h2); 287 int[] ibuf = new int[n]; 288 System.arraycopy(ids, first, ibuf, 0, h1); 289 System.arraycopy(ids, 0, ibuf, h1, h2); 290 return new MetricSnapshotPacket(dbuf, ibuf, sbuf); 291 } else { 292 int n = pos - first; 293 int[][] sbuf = new int[baseMetrics.length][]; 294 for (int i = 0; i < baseMetrics.length; i++) { 295 sbuf[i] = new int[n]; 296 System.arraycopy(buf[i], first, sbuf[i], 0, n); 297 } 298 Date[] dbuf = new Date[n]; 299 System.arraycopy(dates, first, dbuf, 0, n); 300 int[] ibuf = new int[n]; 301 System.arraycopy(ids, first, ibuf, 0, n); 302 return new MetricSnapshotPacket(dbuf, ibuf, sbuf); 303 } 304 } 305 } 306 307 310 public MetricSnapshotPacket getMostRecentSnapshot(int lastId) { 311 lock(); 312 try { 313 if (count == 0) return null; 314 int p = (pos + capacity - 1) % capacity; 315 if (ids[p] == lastId) return null; 316 int[][] sbuf = new int[baseMetrics.length][]; 317 for (int i = 0; i < baseMetrics.length; i++) { 318 sbuf[i] = new int[]{buf[i][p]}; 319 } 320 return new MetricSnapshotPacket(new Date[]{dates[p]}, 321 new int[]{ids[p]}, sbuf); 322 } finally { 323 unlock(); 324 } 325 } 326 327 public int getSampleIntervalMs() { 328 return sampleIntervalMs; 329 } 330 331 public void setSampleIntervalMs(int sampleIntervalMs) { 332 if (sampleIntervalMs < 100) sampleIntervalMs = 100; 333 int diff = sampleIntervalMs - this.sampleIntervalMs; 334 if (diff != 0) { 335 this.sampleIntervalMs = sampleIntervalMs; 336 nextSampleTime += diff; 337 snapshotThread.interrupt(); 338 }; 339 } 340 341 public void shutdown() { 342 if (snapshotThread != null) { 343 run = false; 344 snapshotThread.interrupt(); 345 snapshotThread = null; 346 } 347 } 348 349 public void run() { 350 nextSampleTime = System.currentTimeMillis(); 351 nextSampleTime += sampleIntervalMs - nextSampleTime % sampleIntervalMs; 352 for (; run; ) { 353 long now = System.currentTimeMillis(); 354 long diff = nextSampleTime - now; 355 if (diff <= 0) { 356 if (sources != null) { 357 int pos = beginUpdate(); 358 try { 359 for (int i = 0; i < sources.length; i++) { 360 sources[i].sampleMetrics(buf, pos); 361 } 362 } finally { 363 endUpdate(); 364 } 365 } 366 for (;;) { 367 nextSampleTime += sampleIntervalMs; 368 diff = nextSampleTime - now; 369 if (diff > 0) break; 370 } 371 } 372 try { 373 Thread.sleep(diff); 374 } catch (InterruptedException e) { 375 } 377 } 378 } 379 380 public Metric[] getMetrics() { 381 lock(); 382 try { 383 return all; 384 } finally { 385 unlock(); 386 } 387 } 388 389 } 390 | Popular Tags |