1 4 package com.tctest.performance.faulting; 5 6 import com.tc.object.config.ConfigVisitor; 7 import com.tc.object.config.DSOApplicationConfig; 8 import com.tc.object.config.DSOClientConfigHelper; 9 import com.tc.object.config.TransparencyClassSpec; 10 import com.tc.simulator.app.ApplicationConfig; 11 import com.tc.simulator.listener.ListenerProvider; 12 import com.tc.test.TempDirectoryHelper; 13 import com.tctest.performance.generate.load.LinearTransitionLoadGenerator; 14 import com.tctest.performance.generate.load.LoadGenerator; 15 import com.tctest.performance.generate.load.Measurement; 16 import com.tctest.performance.generate.load.Metronome; 17 import com.tctest.performance.generate.load.WorkQueueOverflowException; 18 import com.tctest.performance.results.PerformanceMeasurementMarshaller; 19 import com.tctest.performance.simulate.type.SimulatedType; 20 import com.tctest.performance.simulate.type.SimulatedTypeFactory; 21 import com.tctest.runner.AbstractTransparentApp; 22 23 import java.io.File ; 24 import java.util.ArrayList ; 25 import java.util.Arrays ; 26 import java.util.LinkedList ; 27 import java.util.List ; 28 import java.util.concurrent.CyclicBarrier ; 29 30 public abstract class AbstractDualQueueFaultTestApp extends AbstractTransparentApp { 31 32 private final List results1; 33 private final List results2; 34 private final List sharedResults1; 35 private final List sharedResults2; 36 private Metronome[] compiledResults1; 37 private Metronome[] compiledResults2; 38 private Measurement[] workQueueWaitTimes1; 39 private Measurement[] workQueueWaitTimes2; 40 private static final int DURATION = 60 * 10; 41 private static final int INIT_LOAD = 500; 42 private static final int MAX_LOAD = 2000; 43 private static final int PERCENT_UNIQUE = 100; 44 private final CyclicBarrier barrier; 45 private boolean isReader; 46 private boolean isLocalWriter; 47 private volatile boolean escape; 48 49 public AbstractDualQueueFaultTestApp(String appId, ApplicationConfig cfg, ListenerProvider listenerProvider) { 50 super(appId, cfg, listenerProvider); 51 results1 = new LinkedList (); 52 results2 = new LinkedList (); 53 barrier = new CyclicBarrier (getParticipantCount()); 54 sharedResults1 = new ArrayList (); 55 sharedResults2 = new ArrayList (); 56 } 57 58 protected static TransparencyClassSpec visitConfig(ConfigVisitor visitor, DSOClientConfigHelper config) { 59 TransparencyClassSpec spec = config.getOrCreateSpec(CyclicBarrier .class.getName()); 60 String className = AbstractDualQueueFaultTestApp.class.getName(); 61 spec = config.getOrCreateSpec(className); 62 63 String methodExpression = "* " + className + "*.*(..)"; 64 config.addWriteAutolock(methodExpression); 65 config.addWriteAutolock("* " + CyclicBarrier .class.getName() + "*.*(..)"); 66 67 spec.addRoot("isReader", "isReader"); 68 spec.addRoot("barrier", "barrier"); 69 spec.addRoot("sharedResults1", "sharedResults1"); 70 spec.addRoot("sharedResults2", "sharedResults2"); 71 72 return spec; 73 } 74 75 protected static void visitConfig(ConfigVisitor visitor, DSOApplicationConfig config) { 76 config.addIncludePattern(CyclicBarrier .class.getName()); 77 String className = AbstractDualQueueFaultTestApp.class.getName(); 78 config.addIncludePattern(className); 79 80 String methodExpression = "* " + className + "*.*(..)"; 81 config.addWriteAutolock(methodExpression); 82 config.addWriteAutolock("* " + CyclicBarrier .class.getName() + "*.*(..)"); 83 84 config.addRoot("isReader", className + ".isReader"); 85 config.addRoot("barrier", className + ".barrier"); 86 config.addRoot("sharedResults1", className + ".sharedResults1"); 87 config.addRoot("sharedResults2", className + ".sharedResults2"); 88 } 89 90 92 public void run() { 93 synchronized (barrier) { 94 if (!isReader) { 95 isReader = true; 96 isLocalWriter = true; 97 } 98 } 99 try { 100 barrier.await(); 101 102 if (isLocalWriter) doWriter(); 103 else doReader(); 104 105 barrier.await(); 106 107 System.err.println("DURATION COMPLETE"); 108 System.err.println("Compliling Shared Results"); 109 110 synchronized (sharedResults1) { 111 sharedResults1.addAll(results1); 112 } 113 synchronized (sharedResults2) { 114 sharedResults2.addAll(results2); 115 } 116 117 barrier.await(); 118 119 } catch (Throwable t) { 120 notifyError(t); 121 } 122 123 if (isLocalWriter) { 124 synchronized (sharedResults1) { 125 compiledResults1 = (Metronome[]) sharedResults1.toArray(new Metronome[0]); 126 Arrays.sort(sharedResults1.toArray(compiledResults1)); 127 } 128 synchronized (sharedResults2) { 129 compiledResults2 = (Metronome[]) sharedResults2.toArray(new Metronome[0]); 130 Arrays.sort(sharedResults2.toArray(compiledResults2)); 131 } 132 writeData(); 133 } 134 } 135 136 private void doWriter() throws Throwable { 137 Thread producer1 = new Thread () { 138 public void run() { 139 SimulatedType sInt = SimulatedTypeFactory.create(new Integer (0)); 140 LoadGenerator loadGenerator = new LinearTransitionLoadGenerator(); 141 loadGenerator.start(DURATION, INIT_LOAD, MAX_LOAD, sInt, PERCENT_UNIQUE); 142 try { 143 while (!escape) { 144 Object obj = loadGenerator.getNext(); 145 if (obj == null) { 146 workQueueWaitTimes1 = loadGenerator.getWaitTimes(); 147 break; } 149 populate1(obj); 150 } 151 } catch (WorkQueueOverflowException e) { 152 System.err.println("UPPER BOUND REACHED"); 153 workQueueWaitTimes1 = loadGenerator.getWaitTimes(); 154 escape = true; 155 return; 156 157 } catch (InterruptedException e) { 158 throw new RuntimeException (e); } 160 } 161 }; 162 producer1.setDaemon(true); 163 164 Thread producer2 = new Thread () { 165 public void run() { 166 SimulatedType sInt = SimulatedTypeFactory.create(new Integer (0)); 167 LoadGenerator loadGenerator = new LinearTransitionLoadGenerator(); 168 loadGenerator.start(DURATION, INIT_LOAD, MAX_LOAD, sInt, PERCENT_UNIQUE); 169 try { 170 while (!escape) { 171 Object obj = loadGenerator.getNext(); 172 if (obj == null) { 173 workQueueWaitTimes2 = loadGenerator.getWaitTimes(); 174 break; } 176 populate2(obj); 177 } 178 } catch (WorkQueueOverflowException e) { 179 System.err.println("UPPER BOUND REACHED"); 180 workQueueWaitTimes2 = loadGenerator.getWaitTimes(); 181 escape = true; 182 return; 183 184 } catch (InterruptedException e) { 185 throw new RuntimeException (e); } 187 } 188 }; 189 producer2.setDaemon(true); 190 191 producer1.start(); 192 producer2.start(); 193 System.err.println("LOAD STARTED"); 194 195 barrier.await(); 197 198 try { 199 producer1.join(); 201 producer2.join(); 202 } catch (Throwable t) { 203 notifyError(t); 204 } 205 } 206 207 private void doReader() throws Throwable { 208 Thread consumer1 = new Thread () { 209 public void run() { 210 try { 211 while (true) { 212 retrieve1(); 213 } 214 } catch (Throwable t) { 215 notifyError(t); 216 } 217 } 218 }; 219 consumer1.setDaemon(true); 220 221 Thread consumer2 = new Thread () { 222 public void run() { 223 try { 224 while (true) { 225 retrieve2(); 226 } 227 } catch (Throwable t) { 228 notifyError(t); 229 } 230 } 231 }; 232 consumer2.setDaemon(true); 233 234 barrier.await(); 236 consumer1.start(); 237 consumer2.start(); 238 } 239 240 protected abstract void populate1(Object data) throws InterruptedException ; 241 242 protected abstract void populate2(Object data) throws InterruptedException ; 243 244 protected abstract void retrieve1() throws InterruptedException ; 245 246 protected abstract void retrieve2() throws InterruptedException ; 247 248 protected abstract String title(); 249 250 protected List results1() { 251 return results1; 252 } 253 254 protected List results2() { 255 return results2; 256 } 257 258 protected List generateStatistics() { 259 List measurementList = new ArrayList (); 260 Metronome data; 261 262 Measurement[] stats1 = new Measurement[compiledResults1.length]; 263 for (int i = 0; i < stats1.length; i++) { 264 data = compiledResults1[i]; 265 stats1[i] = new Measurement(data.load, data.endtime - data.starttime); 266 } 267 Measurement[] stats2 = new Measurement[compiledResults2.length]; 268 for (int i = 0; i < stats2.length; i++) { 269 data = compiledResults2[i]; 270 stats2[i] = new Measurement(data.load, data.endtime - data.starttime); 271 } 272 273 measurementList.add(workQueueWaitTimes1); 274 measurementList.add(workQueueWaitTimes2); 275 measurementList.add(stats1); 276 measurementList.add(stats2); 277 278 return measurementList; 279 } 280 281 protected void writeData() { 282 try { 283 TempDirectoryHelper helper = new TempDirectoryHelper(getClass()); 284 File dataDir = helper.getDirectory(); 285 File output = new File (dataDir + File.separator + "results.data"); 286 output.createNewFile(); 287 System.err.println("WROTE RESULT DATA TO: " + output); 288 289 PerformanceMeasurementMarshaller.Header header = PerformanceMeasurementMarshaller.createHeader(); 290 header.title = title(); 291 header.xLabel = INIT_LOAD + " to " + MAX_LOAD + " Objects/sec."; 292 header.yLabel = "Time spent in queue (Milliseconds)"; 293 header.duration = DURATION; 294 295 String [] lineDescriptions = new String [] { "Work Queue1 Wait Time", "Work Queue2 Wait Time", 296 "Time Spent in Shared Queue - duration: " + header.duration + " sec. | " + header.xLabel }; 297 298 PerformanceMeasurementMarshaller.marshall(generateStatistics(), header, output, lineDescriptions); 299 300 } catch (Throwable t) { 301 notifyError(t); 302 } 303 } 304 } 305 | Popular Tags |