1 25 26 package org.jrobin.core; 27 28 import java.io.IOException ; 29 30 41 public class Archive implements RrdUpdater { 42 private RrdDb parentDb; 43 private RrdString consolFun; 45 private RrdDouble xff; 46 private RrdInt steps, rows; 47 private Robin[] robins; 49 private ArcState[] states; 50 51 Archive(RrdDb parentDb, ArcDef arcDef) throws IOException { 52 boolean shouldInitialize = arcDef != null; 53 this.parentDb = parentDb; 54 consolFun = new RrdString(this); 55 xff = new RrdDouble(this); 56 steps = new RrdInt(this); 57 rows = new RrdInt(this); 58 if(shouldInitialize) { 59 consolFun.set(arcDef.getConsolFun()); 60 xff.set(arcDef.getXff()); 61 steps.set(arcDef.getSteps()); 62 rows.set(arcDef.getRows()); 63 } 64 int n = parentDb.getHeader().getDsCount(); 65 states = new ArcState[n]; 66 robins = new Robin[n]; 67 for(int i = 0; i < n; i++) { 68 states[i] = new ArcState(this, shouldInitialize); 69 robins[i] = new Robin(this, rows.get(), shouldInitialize); 70 } 71 } 72 73 Archive(RrdDb parentDb, DataImporter reader, int arcIndex) throws IOException , RrdException { 75 this(parentDb, new ArcDef( 76 reader.getConsolFun(arcIndex), reader.getXff(arcIndex), 77 reader.getSteps(arcIndex), reader.getRows(arcIndex))); 78 int n = parentDb.getHeader().getDsCount(); 79 for(int i = 0; i < n; i++) { 80 states[i].setAccumValue(reader.getStateAccumValue(arcIndex, i)); 82 states[i].setNanSteps(reader.getStateNanSteps(arcIndex, i)); 83 double[] values = reader.getValues(arcIndex, i); 85 robins[i].update(values); 86 } 87 } 88 89 96 public long getArcStep() throws IOException { 97 long step = parentDb.getHeader().getStep(); 98 return step * steps.get(); 99 } 100 101 String dump() throws IOException { 102 StringBuffer buffer = new StringBuffer ("== ARCHIVE ==\n"); 103 buffer.append("RRA:" + consolFun.get() + ":" + xff.get() + ":" + 104 steps.get() + ":" + rows.get() + "\n"); 105 buffer.append("interval [" + getStartTime() + ", " + getEndTime() + "]" + "\n"); 106 for(int i = 0; i < robins.length; i++) { 107 buffer.append(states[i].dump()); 108 buffer.append(robins[i].dump()); 109 } 110 return buffer.toString(); 111 } 112 113 RrdDb getParentDb() { 114 return parentDb; 115 } 116 117 void archive(int dsIndex, double value, long numUpdates) throws IOException { 118 Robin robin = robins[dsIndex]; 119 ArcState state = states[dsIndex]; 120 long step = parentDb.getHeader().getStep(); 121 long lastUpdateTime = parentDb.getHeader().getLastUpdateTime(); 122 long updateTime = Util.normalize(lastUpdateTime, step) + step; 123 long arcStep = getArcStep(); 124 while(numUpdates > 0) { 126 accumulate(state, value); 127 numUpdates--; 128 if(updateTime % arcStep == 0) { 129 finalizeStep(state, robin); 130 break; 131 } 132 else { 133 updateTime += step; 134 } 135 } 136 int bulkUpdateCount = (int) Math.min(numUpdates / steps.get(), (long) rows.get()); 138 robin.bulkStore(value, bulkUpdateCount); 139 long remainingUpdates = numUpdates % steps.get(); 141 for(long i = 0; i < remainingUpdates; i++) { 142 accumulate(state, value); 143 } 144 } 145 146 private void accumulate(ArcState state, double value) throws IOException { 147 if(Double.isNaN(value)) { 148 state.setNanSteps(state.getNanSteps() + 1); 149 } 150 else { 151 if(consolFun.get().equals("MIN")) { 152 state.setAccumValue(Util.min(state.getAccumValue(), value)); 153 } 154 else if(consolFun.get().equals("MAX")) { 155 state.setAccumValue(Util.max(state.getAccumValue(), value)); 156 } 157 else if(consolFun.get().equals("LAST")) { 158 state.setAccumValue(value); 159 } 160 else if(consolFun.get().equals("AVERAGE")) { 161 state.setAccumValue(Util.sum(state.getAccumValue(), value)); 162 } 163 } 164 } 165 166 private void finalizeStep(ArcState state, Robin robin) throws IOException { 167 long arcSteps = steps.get(); 169 double arcXff = xff.get(); 170 long nanSteps = state.getNanSteps(); 171 double accumValue = state.getAccumValue(); 173 if(nanSteps <= arcXff * arcSteps && !Double.isNaN(accumValue)) { 174 if(consolFun.get().equals("AVERAGE")) { 175 accumValue /= (arcSteps - nanSteps); 176 } 177 robin.store(accumValue); 178 } 179 else { 180 robin.store(Double.NaN); 181 } 182 state.setAccumValue(Double.NaN); 183 state.setNanSteps(0); 184 } 185 186 191 public String getConsolFun() throws IOException { 192 return consolFun.get(); 193 } 194 195 200 public double getXff() throws IOException { 201 return xff.get(); 202 } 203 204 209 public int getSteps() throws IOException { 210 return steps.get(); 211 } 212 213 218 public int getRows() throws IOException { 219 return rows.get(); 220 } 221 222 227 public long getStartTime() throws IOException { 228 long endTime = getEndTime(); 229 long arcStep = getArcStep(); 230 long numRows = rows.get(); 231 return endTime - (numRows - 1) * arcStep; 232 } 233 234 239 public long getEndTime() throws IOException { 240 long arcStep = getArcStep(); 241 long lastUpdateTime = parentDb.getHeader().getLastUpdateTime(); 242 return Util.normalize(lastUpdateTime, arcStep); 243 } 244 245 252 public ArcState getArcState(int dsIndex) { 253 return states[dsIndex]; 254 } 255 256 262 public Robin getRobin(int dsIndex) { 263 return robins[dsIndex]; 264 } 265 266 FetchPoint[] fetch(FetchRequest request) throws IOException , RrdException { 267 if(request.getFilter() != null) { 268 throw new RrdException("fetch() method does not support filtered datasources." + 269 " Use fetchData() to get filtered fetch data."); 270 } 271 long arcStep = getArcStep(); 272 long fetchStart = Util.normalize(request.getFetchStart(), arcStep); 273 long fetchEnd = Util.normalize(request.getFetchEnd(), arcStep); 274 if(fetchEnd < request.getFetchEnd()) { 275 fetchEnd += arcStep; 276 } 277 long startTime = getStartTime(); 278 long endTime = getEndTime(); 279 int dsCount = robins.length; 280 int ptsCount = (int) ((fetchEnd - fetchStart) / arcStep + 1); 281 FetchPoint[] points = new FetchPoint[ptsCount]; 282 for(int i = 0; i < ptsCount; i++) { 283 long time = fetchStart + i * arcStep; 284 FetchPoint point = new FetchPoint(time, dsCount); 285 if(time >= startTime && time <= endTime) { 286 int robinIndex = (int)((time - startTime) / arcStep); 287 for(int j = 0; j < dsCount; j++) { 288 point.setValue(j, robins[j].getValue(robinIndex)); 289 } 290 } 291 points[i] = point; 292 } 293 return points; 294 } 295 296 FetchData fetchData(FetchRequest request) throws IOException , RrdException { 297 long arcStep = getArcStep(); 298 long fetchStart = Util.normalize(request.getFetchStart(), arcStep); 299 long fetchEnd = Util.normalize(request.getFetchEnd(), arcStep); 300 if(fetchEnd < request.getFetchEnd()) { 301 fetchEnd += arcStep; 302 } 303 long startTime = getStartTime(); 304 long endTime = getEndTime(); 305 String [] dsToFetch = request.getFilter(); 306 if(dsToFetch == null) { 307 dsToFetch = parentDb.getDsNames(); 308 } 309 int dsCount = dsToFetch.length; 310 int ptsCount = (int) ((fetchEnd - fetchStart) / arcStep + 1); 311 long[] timestamps = new long[ptsCount]; 312 double[][] values = new double[dsCount][ptsCount]; 313 long matchStartTime = Math.max(fetchStart, startTime); 314 long matchEndTime = Math.min(fetchEnd, endTime); 315 double[][] robinValues = null; 316 if(matchStartTime <= matchEndTime) { 317 int matchCount = (int)((matchEndTime - matchStartTime) / arcStep + 1); 319 int matchStartIndex = (int)((matchStartTime - startTime) / arcStep); 320 robinValues = new double[dsCount][]; 321 for(int i = 0; i < dsCount; i++) { 322 int dsIndex = parentDb.getDsIndex(dsToFetch[i]); 323 robinValues[i] = robins[dsIndex].getValues(matchStartIndex, matchCount); 324 } 325 } 326 for(int ptIndex = 0; ptIndex < ptsCount; ptIndex++) { 327 long time = fetchStart + ptIndex * arcStep; 328 timestamps[ptIndex] = time; 329 for(int i = 0; i < dsCount; i++) { 330 double value = Double.NaN; 331 if(time >= matchStartTime && time <= matchEndTime) { 332 int robinValueIndex = (int)((time - matchStartTime) / arcStep); 334 value = robinValues[i][robinValueIndex]; 335 } 336 values[i][ptIndex] = value; 337 } 338 } 339 FetchData fetchData = new FetchData(this, request); 340 fetchData.setTimestamps(timestamps); 341 fetchData.setValues(values); 342 return fetchData; 343 } 344 345 void appendXml(XmlWriter writer) throws IOException { 346 writer.startTag("rra"); 347 writer.writeTag("cf", consolFun.get()); 348 writer.writeComment(getArcStep() + " seconds"); 349 writer.writeTag("pdp_per_row", steps.get()); 350 writer.writeTag("xff", xff.get()); 351 writer.startTag("cdp_prep"); 352 for(int i = 0; i < states.length; i++) { 353 states[i].appendXml(writer); 354 } 355 writer.closeTag(); writer.startTag("database"); 357 long startTime = getStartTime(); 358 for(int i = 0; i < rows.get(); i++) { 359 long time = startTime + i * getArcStep(); 360 writer.writeComment(Util.getDate(time) + " / " + time); 361 writer.startTag("row"); 362 for(int j = 0; j < robins.length; j++) { 363 writer.writeTag("v", robins[j].getValue(i)); 364 } 365 writer.closeTag(); } 367 writer.closeTag(); writer.closeTag(); } 370 371 377 public void copyStateTo(RrdUpdater other) throws IOException , RrdException { 378 if(!(other instanceof Archive)) { 379 throw new RrdException( 380 "Cannot copy Archive object to " + other.getClass().getName()); 381 } 382 Archive arc = (Archive) other; 383 if(!arc.consolFun.get().equals(consolFun.get())) { 384 throw new RrdException("Incompatible consolidation functions"); 385 } 386 if(arc.steps.get() != steps.get()) { 387 throw new RrdException("Incompatible number of steps"); 388 } 389 int count = parentDb.getHeader().getDsCount(); 390 for(int i = 0; i < count; i++) { 391 int j = Util.getMatchingDatasourceIndex(parentDb, i, arc.parentDb); 392 if(j >= 0) { 393 states[i].copyStateTo(arc.states[j]); 394 robins[i].copyStateTo(arc.robins[j]); 395 } 396 } 397 } 398 399 405 public void setXff(double xff) throws RrdException, IOException { 406 if(xff < 0D || xff >= 1D) { 407 throw new RrdException("Invalid xff supplied (" + xff + "), must be >= 0 and < 1"); 408 } 409 this.xff.set(xff); 410 } 411 412 417 public RrdBackend getRrdBackend() { 418 return parentDb.getRrdBackend(); 419 } 420 421 425 public RrdAllocator getRrdAllocator() { 426 return parentDb.getRrdAllocator(); 427 } 428 } 429 | Popular Tags |