1 4 package com.tctest.performance.faulting; 5 6 import com.tc.object.config.ConfigVisitor; 7 import com.tc.object.config.DSOApplicationConfig; 8 import com.tc.object.config.DSOClientConfigHelper; 9 import com.tc.object.config.TransparencyClassSpec; 10 import com.tc.simulator.app.ApplicationConfig; 11 import com.tc.simulator.listener.ListenerProvider; 12 import com.tc.test.TempDirectoryHelper; 13 import com.tctest.performance.generate.load.LinearTransitionLoadGenerator; 14 import com.tctest.performance.generate.load.LoadGenerator; 15 import com.tctest.performance.generate.load.Measurement; 16 import com.tctest.performance.generate.load.Metronome; 17 import com.tctest.performance.generate.load.WorkQueueOverflowException; 18 import com.tctest.performance.results.PerformanceMeasurementMarshaller; 19 import com.tctest.performance.simulate.type.SimulatedType; 20 import com.tctest.performance.simulate.type.SimulatedTypeFactory; 21 import com.tctest.runner.AbstractTransparentApp; 22 23 import java.io.File ; 24 import java.util.ArrayList ; 25 import java.util.Arrays ; 26 import java.util.LinkedList ; 27 import java.util.List ; 28 import java.util.concurrent.CyclicBarrier ; 29 30 public abstract class AbstractSingleQueueFaultTestApp extends AbstractTransparentApp { 31 32 private static final int DURATION = 60 * 10; 33 private static final int INIT_LOAD = 200; 34 private static final int MAX_LOAD = 2000; 35 private static final int PERCENT_UNIQUE = 100; 36 37 private final CyclicBarrier barrier; 38 private int writerCounter; 39 private final List sharedResults; 40 private boolean killWriters; 41 42 private boolean isLocalWriter, isMasterNode; 43 private final int writers; 44 private final List results; 45 private Measurement[] workQueueWaitTimes; 46 private Metronome[] compiledResults; 47 48 public AbstractSingleQueueFaultTestApp(String appId, ApplicationConfig cfg, ListenerProvider listenerProvider) { 49 super(appId, cfg, listenerProvider); 50 writers = getIntensity(); 51 results = new LinkedList (); 52 barrier = new CyclicBarrier (getParticipantCount()); 53 sharedResults = new ArrayList (); 54 } 55 56 protected static TransparencyClassSpec visitConfig(ConfigVisitor visitor, DSOClientConfigHelper config) { 57 TransparencyClassSpec spec = config.getOrCreateSpec(CyclicBarrier .class.getName()); 58 String className = AbstractSingleQueueFaultTestApp.class.getName(); 59 spec = config.getOrCreateSpec(className); 60 61 config.addWriteAutolock("* " + className + ".run()"); 62 config.addWriteAutolock("* " + className + ".doWriter()"); 63 config.addReadAutolock("* " + className + ".killWriters()"); 64 config.addWriteAutolock("* " + CyclicBarrier .class.getName() + "*.*(..)"); 65 66 spec.addRoot("isReader", "isReader"); 67 spec.addRoot("killWriters", "killWriters"); 68 spec.addRoot("barrier", "barrier"); 69 spec.addRoot("sharedResults", "sharedResults"); 70 spec.addRoot("writerCounter", "writerCounter"); 71 72 return spec; 73 } 74 75 protected static void visitConfig(ConfigVisitor visitor, DSOApplicationConfig config) { 76 config.addIncludePattern(CyclicBarrier .class.getName()); 77 String className = AbstractSingleQueueFaultTestApp.class.getName(); 78 config.addIncludePattern(className); 79 80 config.addWriteAutolock("* " + className + ".run()"); 81 config.addWriteAutolock("* " + className + ".doWriter()"); 82 config.addReadAutolock("* " + className + ".killWriters()"); 83 config.addWriteAutolock("* " + CyclicBarrier .class.getName() + "*.*(..)"); 84 85 config.addRoot("isReader", className + ".isReader"); 86 config.addRoot("killWriters", className + ".killWriters"); 87 config.addRoot("barrier", className + ".barrier"); 88 config.addRoot("sharedResults", className + ".sharedResults"); 89 config.addRoot("writerCounter", className + ".writerCounter"); 90 } 91 92 public void run() { 93 synchronized (barrier) { 94 if (++writerCounter <= writers) { 95 isLocalWriter = true; 96 } 97 } 98 try { 99 100 barrier.await(); 101 102 if (isLocalWriter) doWriter(); 103 else doReader(); 104 105 barrier.await(); 106 107 if (isMasterNode) { 108 System.err.println("DURATION COMPLETE - TIMESTAMP " + System.currentTimeMillis()); 109 System.err.println("Compliling Shared Results"); 110 } 111 112 synchronized (sharedResults) { 113 sharedResults.addAll(results); 114 } 115 116 barrier.await(); 117 118 } catch (Throwable t) { 119 notifyError(t); 120 } 121 122 if (isMasterNode) { 123 synchronized (sharedResults) { 124 compiledResults = (Metronome[]) sharedResults.toArray(new Metronome[0]); 125 Arrays.sort(sharedResults.toArray(compiledResults)); 126 } 127 writeData(); 128 } 129 } 130 131 private void doWriter() throws Throwable { 132 Thread producer = new Thread () { 133 public void run() { 134 int init = INIT_LOAD / (getParticipantCount() - 1); 135 int max = MAX_LOAD / (getParticipantCount() - 1); 136 SimulatedType sInt = SimulatedTypeFactory.create(new Integer (0)); 137 LoadGenerator loadGenerator = new LinearTransitionLoadGenerator(); 138 loadGenerator.start(DURATION, init, max, sInt, PERCENT_UNIQUE); 139 try { 140 int count = 0; 141 while (true) { 142 if (count > 100) { 143 count = 0; 144 if (killWriters()) return; 145 } 146 Object obj = loadGenerator.getNext(); 147 if (obj == null) { 148 workQueueWaitTimes = loadGenerator.getWaitTimes(); 149 isMasterNode = true; 150 break; } 152 populate(obj); 153 } 154 } catch (WorkQueueOverflowException e) { 155 System.err.println("UPPER BOUND REACHED"); 156 synchronized (this) { 157 killWriters = true; 158 isMasterNode = true; 159 } 160 workQueueWaitTimes = loadGenerator.getWaitTimes(); 161 return; 162 } catch (InterruptedException e) { 163 throw new RuntimeException (e); } 165 } 166 }; 167 producer.setDaemon(true); 168 producer.start(); 169 170 System.err.println("LOAD STARTED - DURATION " + DURATION + "sec (15sec prep) - TIMESTAMP " 171 + System.currentTimeMillis()); 172 173 barrier.await(); 175 176 try { 177 producer.join(); 179 } catch (Throwable t) { 180 notifyError(t); 181 } 182 } 183 184 private void doReader() throws Throwable { 185 Thread consumer = new Thread () { 186 public void run() { 187 try { 188 while (true) { 189 retrieve(); 190 } 191 } catch (Throwable t) { 192 notifyError(t); 193 } 194 } 195 }; 196 consumer.setDaemon(true); 197 198 barrier.await(); 200 consumer.start(); 201 } 202 203 private synchronized boolean killWriters() { 204 return killWriters; 205 } 206 207 protected abstract void populate(Object data) throws InterruptedException ; 208 209 protected abstract void retrieve() throws InterruptedException ; 210 211 protected abstract String title(); 212 213 protected List results() { 214 return results; 215 } 216 217 protected List generateStatistics() { 218 List measurementList = new ArrayList (); 219 Measurement[] stats = new Measurement[compiledResults.length]; 220 Metronome data; 221 for (int i = 0; i < stats.length; i++) { 222 data = compiledResults[i]; 223 stats[i] = new Measurement(data.load, data.endtime - data.starttime); 224 } 225 measurementList.add(workQueueWaitTimes); 226 measurementList.add(stats); 227 228 return measurementList; 229 } 230 231 protected void writeData() { 232 try { 233 TempDirectoryHelper helper = new TempDirectoryHelper(getClass()); 234 File dataDir = helper.getDirectory(); 235 File output = new File (dataDir + File.separator + "results.data"); 236 output.createNewFile(); 237 System.err.println("WROTE RESULT DATA TO: " + output); 238 239 PerformanceMeasurementMarshaller.Header header = PerformanceMeasurementMarshaller.createHeader(); 240 header.title = title(); 241 header.xLabel = INIT_LOAD + " to " + MAX_LOAD + " Objects/sec."; 242 header.yLabel = "Time spent in queue (Milliseconds)"; 243 header.duration = DURATION; 244 245 String [] lineDescriptions = new String [] { "Work Queue Wait Time", 246 "Time Spent in Shared Queue - duration: " + header.duration + " sec. | " + header.xLabel }; 247 248 PerformanceMeasurementMarshaller.marshall(generateStatistics(), header, output, lineDescriptions); 249 250 } catch (Throwable t) { 251 notifyError(t); 252 } 253 } 254 } 255 | Popular Tags |