1 4 package com.tc.async.impl; 5 6 import com.tc.async.api.StageMonitor; 7 import com.tc.text.StringFormatter; 8 9 import java.util.ArrayList ; 10 import java.util.Iterator ; 11 import java.util.List ; 12 13 class StageMonitorImpl implements StageMonitor { 14 15 private final String name; 16 private final StringFormatter formatter; 17 18 private final List snapshots = new ArrayList (); 19 private long begin = System.currentTimeMillis(); 20 21 StageMonitorImpl(String name, StringFormatter formatter) { 22 this.name = formatter.rightPad(30, name); 23 this.formatter = formatter; 24 } 25 26 public synchronized void eventBegin(int queueDepth) { 27 snapshots.add(new Snapshot(queueDepth)); 28 } 29 30 public synchronized String dumpAndFlush() { 31 long elapsed = System.currentTimeMillis() - begin; 32 StringBuffer rv = new StringBuffer (); 33 dump(elapsed, rv); 34 flush(); 35 return rv.toString(); 36 } 37 38 private StringBuffer dump(long elapsed, StringBuffer buf) { 39 Analysis an = analyze(); 40 buf.append(name).append("| period: ").append(formatter.leftPad(10, an.getElapsedTime())).append("ms.| events: ") 41 .append(formatter.leftPad(10, an.getEventCount())); 42 43 buf.append("| events/sec: ").append(formatter.leftPad(10, an.getEventsPerSecond())); 44 45 buf.append("| Q depth, min: ").append(formatter.leftPad(10, an.getMinQueueDepth())); 46 buf.append(", max: ").append(formatter.leftPad(10, an.getMaxQueueDepth())); 47 buf.append(", avg: ").append(formatter.leftPad(10, an.getAvgQueueDepth())); 48 49 return buf; 50 } 51 52 private Double safeDiv(long numerator, long denominator) { 54 if (denominator > 0) { 55 return new Double (((double) numerator) / ((double) denominator)); 56 } else { 57 return new Double (-1); 58 } 59 } 60 61 public synchronized Analysis analyze() { 62 long elapsed = System.currentTimeMillis() - begin; 63 int min = -1, max = 0; 64 long sum = 0; 65 for (Iterator i = snapshots.iterator(); i.hasNext();) { 66 int qd = ((Snapshot) i.next()).getQueueDepth(); 67 if (qd < min || min < 0) min = qd; 68 if (qd > max) max = qd; 69 sum += qd; 70 } 71 72 return new AnalysisImpl(new Long (elapsed), new Integer (snapshots.size()), 73 safeDiv(snapshots.size() * 1000, elapsed), new Integer (min), new Integer (max), 74 safeDiv(sum, snapshots.size())); 75 } 76 77 public synchronized void flush() { 78 snapshots.clear(); 79 begin = System.currentTimeMillis(); 80 } 81 82 public static class AnalysisImpl implements Analysis { 83 private final Number eventCount; 84 private final Number eventsPerSecond; 85 private final Number minQueueDepth; 86 private final Number maxQueueDepth; 87 private final Number avgQueueDepth; 88 private final Number elapsedTime; 89 90 private AnalysisImpl(Number elapsedTime, Number eventCount, Number eventsPerSecond, Number minQueueDepth, 91 Number maxQueueDepth, Number avgQueueDepth) { 92 this.elapsedTime = elapsedTime; 93 this.eventCount = eventCount; 94 this.eventsPerSecond = eventsPerSecond; 95 this.minQueueDepth = minQueueDepth; 96 this.maxQueueDepth = maxQueueDepth; 97 this.avgQueueDepth = avgQueueDepth; 98 } 99 100 public Number getElapsedTime() { 101 return elapsedTime; 102 } 103 104 public Number getAvgQueueDepth() { 105 return avgQueueDepth; 106 } 107 108 public Number getMaxQueueDepth() { 109 return maxQueueDepth; 110 } 111 112 public Number getMinQueueDepth() { 113 return minQueueDepth; 114 } 115 116 public Number getEventsPerSecond() { 117 return eventsPerSecond; 118 } 119 120 public Number getEventCount() { 121 return eventCount; 122 } 123 124 } 125 126 private static class Snapshot { 127 private final int queueDepth; 128 129 private Snapshot(int queueDepth) { 130 this.queueDepth = queueDepth; 131 } 132 133 public int getQueueDepth() { 134 return queueDepth; 135 } 136 } 137 } | Popular Tags |