1 25 26 package org.jrobin.core; 27 28 import java.io.IOException ; 29 30 40 41 public class Datasource implements RrdUpdater { 42 private RrdDb parentDb; 43 private RrdString dsName, dsType; 45 private RrdLong heartbeat; 46 private RrdDouble minValue, maxValue; 47 48 private RrdDouble lastValue; 50 private RrdLong nanSeconds; 51 private RrdDouble accumValue; 52 53 Datasource(RrdDb parentDb, DsDef dsDef) throws IOException { 54 boolean shouldInitialize = dsDef != null; 55 this.parentDb = parentDb; 56 dsName = new RrdString(this); 57 dsType = new RrdString(this); 58 heartbeat = new RrdLong(this); 59 minValue = new RrdDouble(this); 60 maxValue = new RrdDouble(this); 61 lastValue = new RrdDouble(this); 62 accumValue = new RrdDouble(this); 63 nanSeconds = new RrdLong(this); 64 if(shouldInitialize) { 65 dsName.set(dsDef.getDsName()); 66 dsType.set(dsDef.getDsType()); 67 heartbeat.set(dsDef.getHeartbeat()); 68 minValue.set(dsDef.getMinValue()); 69 maxValue.set(dsDef.getMaxValue()); 70 lastValue.set(Double.NaN); 71 accumValue.set(0.0); 72 Header header = parentDb.getHeader(); 73 nanSeconds.set(header.getLastUpdateTime() % header.getStep()); 74 } 75 } 76 77 Datasource(RrdDb parentDb, DataImporter reader, int dsIndex) throws IOException , RrdException { 78 this(parentDb, null); 79 dsName.set(reader.getDsName(dsIndex)); 80 dsType.set(reader.getDsType(dsIndex)); 81 heartbeat.set(reader.getHeartbeat(dsIndex)); 82 minValue.set(reader.getMinValue(dsIndex)); 83 maxValue.set(reader.getMaxValue(dsIndex)); 84 lastValue.set(reader.getLastValue(dsIndex)); 85 accumValue.set(reader.getAccumValue(dsIndex)); 86 nanSeconds.set(reader.getNanSeconds(dsIndex)); 87 } 88 89 String dump() throws IOException { 90 return "== DATASOURCE ==\n" + 91 "DS:" + dsName.get() + ":" + dsType.get() + ":" + 92 heartbeat.get() + ":" + minValue.get() + ":" + 93 maxValue.get() + "\nlastValue:" + lastValue.get() + 94 " nanSeconds:" + nanSeconds.get() + 95 " accumValue:" + accumValue.get() + "\n"; 96 } 97 98 103 public String getDsName() throws IOException { 104 return dsName.get(); 105 } 106 107 113 public String getDsType() throws IOException { 114 return dsType.get(); 115 } 116 117 123 124 public long getHeartbeat() throws IOException { 125 return heartbeat.get(); 126 } 127 128 134 public double getMinValue() throws IOException { 135 return minValue.get(); 136 } 137 138 144 public double getMaxValue() throws IOException { 145 return maxValue.get(); 146 } 147 148 154 public double getLastValue() throws IOException { 155 return lastValue.get(); 156 } 157 158 164 public double getAccumValue() throws IOException { 165 return accumValue.get(); 166 } 167 168 174 public long getNanSeconds() throws IOException { 175 return nanSeconds.get(); 176 } 177 178 void process(long newTime, double newValue) throws IOException , RrdException { 179 Header header = parentDb.getHeader(); 180 long step = header.getStep(); 181 long oldTime = header.getLastUpdateTime(); 182 long startTime = Util.normalize(oldTime, step); 183 long endTime = startTime + step; 184 double oldValue = lastValue.get(); 185 double updateValue = calculateUpdateValue(oldTime, oldValue, newTime, newValue); 186 if(newTime < endTime) { 187 accumulate(oldTime, newTime, updateValue); 188 } 189 else { 190 long boundaryTime = Util.normalize(newTime, step); 192 accumulate(oldTime, boundaryTime, updateValue); 193 double value = calculateTotal(startTime, boundaryTime); 194 long numSteps= (boundaryTime - endTime) / step + 1L; 196 parentDb.archive(this, value, numSteps); 198 nanSeconds.set(0); 200 accumValue.set(0.0); 201 accumulate(boundaryTime, newTime, updateValue); 202 } 203 } 204 205 private double calculateUpdateValue(long oldTime, double oldValue, 206 long newTime, double newValue) throws IOException { 207 double updateValue = Double.NaN; 208 if(newTime - oldTime <= heartbeat.get()) { 209 String type = dsType.get(); 210 if(type.equals("GAUGE")) { 211 updateValue = newValue; 212 } 213 else if(type.equals("ABSOLUTE")) { 214 if(!Double.isNaN(newValue)) { 215 updateValue = newValue / (newTime - oldTime); 216 } 217 } 218 else if(type.equals("DERIVE")) { 219 if(!Double.isNaN(newValue) && !Double.isNaN(oldValue)) { 220 updateValue = (newValue - oldValue) / (newTime - oldTime); 221 } 222 } 223 else if(type.equals("COUNTER")) { 224 if(!Double.isNaN(newValue) && !Double.isNaN(oldValue)) { 225 double diff = newValue - oldValue; 226 double max32bit = Math.pow(2, 32); 227 double max64bit = Math.pow(2, 64); 228 if(diff < 0) { 229 diff += max32bit; 230 } 231 if(diff < 0) { 232 diff += max64bit - max32bit; 233 } 234 if(diff >= 0) { 235 updateValue = diff / (newTime - oldTime); 236 } 237 } 238 } 239 if(!Double.isNaN(updateValue)) { 240 double minVal = minValue.get(); 241 double maxVal = maxValue.get(); 242 if(!Double.isNaN(minVal) && updateValue < minVal) { 243 updateValue = Double.NaN; 244 } 245 if(!Double.isNaN(maxVal) && updateValue > maxVal) { 246 updateValue = Double.NaN; 247 } 248 } 249 } 250 lastValue.set(newValue); 251 return updateValue; 252 } 253 254 private void accumulate(long oldTime, long newTime, double updateValue) throws IOException { 255 if(Double.isNaN(updateValue)) { 256 nanSeconds.set(nanSeconds.get() + (newTime - oldTime)); 257 } 258 else { 259 accumValue.set(accumValue.get() + updateValue * (newTime - oldTime)); 260 } 261 } 262 263 private double calculateTotal(long startTime, long boundaryTime) throws IOException { 264 double totalValue = Double.NaN; 265 long validSeconds = boundaryTime - startTime - nanSeconds.get(); 266 if(nanSeconds.get() <= heartbeat.get() && validSeconds > 0) { 267 totalValue = accumValue.get() / validSeconds; 268 } 269 return totalValue; 270 } 271 272 void appendXml(XmlWriter writer) throws IOException { 273 writer.startTag("ds"); 274 writer.writeTag("name", dsName.get()); 275 writer.writeTag("type", dsType.get()); 276 writer.writeTag("minimal_heartbeat", heartbeat.get()); 277 writer.writeTag("min", minValue.get()); 278 writer.writeTag("max", maxValue.get()); 279 writer.writeComment("PDP Status"); 280 writer.writeTag("last_ds", lastValue.get(), "UNKN"); 281 writer.writeTag("value", accumValue.get()); 282 writer.writeTag("unknown_sec", nanSeconds.get()); 283 writer.closeTag(); } 285 286 292 public void copyStateTo(RrdUpdater other) throws IOException , RrdException { 293 if(!(other instanceof Datasource)) { 294 throw new RrdException( 295 "Cannot copy Datasource object to " + other.getClass().getName()); 296 } 297 Datasource datasource = (Datasource) other; 298 if(!datasource.dsName.get().equals(dsName.get())) { 299 throw new RrdException("Incomaptible datasource names"); 300 } 301 if(!datasource.dsType.get().equals(dsType.get())) { 302 throw new RrdException("Incomaptible datasource types"); 303 } 304 datasource.lastValue.set(lastValue.get()); 305 datasource.nanSeconds.set(nanSeconds.get()); 306 datasource.accumValue.set(accumValue.get()); 307 } 308 309 314 public int getDsIndex() throws IOException { 315 try { 316 return parentDb.getDsIndex(dsName.get()); 317 } 318 catch(RrdException e) { 319 return -1; 320 } 321 } 322 323 329 public void setHeartbeat(long heartbeat) throws RrdException, IOException { 330 if(heartbeat < 1L) { 331 throw new RrdException("Invalid heartbeat specified: " + heartbeat); 332 } 333 this.heartbeat.set(heartbeat); 334 } 335 336 347 public void setMinValue(double minValue, boolean filterArchivedValues) 348 throws IOException , RrdException { 349 double maxValue = this.maxValue.get(); 350 if(!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) { 351 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue); 352 } 353 this.minValue.set(minValue); 354 if(!Double.isNaN(minValue) && filterArchivedValues) { 355 int dsIndex = getDsIndex(); 356 Archive[] archives = parentDb.getArchives(); 357 for(int i = 0; i < archives.length; i++) { 358 archives[i].getRobin(dsIndex).filterValues(minValue, Double.NaN); 359 } 360 } 361 } 362 363 374 public void setMaxValue(double maxValue, boolean filterArchivedValues) 375 throws IOException , RrdException { 376 double minValue = this.minValue.get(); 377 if(!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) { 378 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue); 379 } 380 this.maxValue.set(maxValue); 381 if(!Double.isNaN(maxValue) && filterArchivedValues) { 382 int dsIndex = getDsIndex(); 383 Archive[] archives = parentDb.getArchives(); 384 for(int i = 0; i < archives.length; i++) { 385 archives[i].getRobin(dsIndex).filterValues(Double.NaN, maxValue); 386 } 387 } 388 } 389 390 403 public void setMinMaxValue(double minValue, double maxValue, boolean filterArchivedValues) 404 throws IOException , RrdException { 405 if(!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) { 406 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue); 407 } 408 this.minValue.set(minValue); 409 this.maxValue.set(maxValue); 410 if(!(Double.isNaN(minValue) && Double.isNaN(maxValue)) && filterArchivedValues) { 411 int dsIndex = getDsIndex(); 412 Archive[] archives = parentDb.getArchives(); 413 for(int i = 0; i < archives.length; i++) { 414 archives[i].getRobin(dsIndex).filterValues(minValue, maxValue); 415 } 416 } 417 } 418 419 424 public RrdBackend getRrdBackend() { 425 return parentDb.getRrdBackend(); 426 } 427 428 432 public RrdAllocator getRrdAllocator() { 433 return parentDb.getRrdAllocator(); 434 } 435 } 436 437 | Popular Tags |