1 5 package com.tctest.restart.system; 6 7 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier; 8 import EDU.oswego.cs.dl.util.concurrent.LinkedNode; 9 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 10 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 11 12 import com.tc.exception.TCRuntimeException; 13 import com.tc.object.config.ConfigVisitor; 14 import com.tc.object.config.DSOClientConfigHelper; 15 import com.tc.object.config.TransparencyClassSpec; 16 import com.tc.object.config.spec.SynchronizedIntSpec; 17 import com.tc.simulator.app.ApplicationConfig; 18 import com.tc.simulator.listener.ListenerProvider; 19 import com.tc.simulator.listener.OutputListener; 20 import com.tc.util.Assert; 21 import com.tc.util.DebugUtil; 22 import com.tctest.runner.AbstractTransparentApp; 23 24 import java.util.ArrayList ; 25 import java.util.Collection ; 26 import java.util.Date ; 27 import java.util.HashMap ; 28 import java.util.HashSet ; 29 import java.util.Iterator ; 30 import java.util.List ; 31 import java.util.Map ; 32 33 public class ObjectDataRestartTestApp extends AbstractTransparentApp { 34 public static final String SYNCHRONOUS_WRITE = "synch-write"; 35 36 private int threadCount = 10; 37 private int workSize = 1 * 100; 38 private int testObjectDepth = 1 * 50; 39 private int iterationCount = 1 * 2; 41 private List workQueue = new ArrayList (); 42 private Collection resultSet = new HashSet (); 43 private SynchronizedInt complete = new SynchronizedInt(0); 44 private OutputListener out; 45 private SynchronizedInt nodes = new SynchronizedInt(0); 46 47 public ObjectDataRestartTestApp(String appId, ApplicationConfig cfg, ListenerProvider listenerProvider) { 48 super(appId, cfg, listenerProvider); 49 this.out = listenerProvider.getOutputListener(); 50 } 51 52 public void run() { 53 try { 54 WorkerFactory wf = new WorkerFactory(getApplicationId(), workQueue, resultSet, complete); 56 57 for (int i = 0; i < threadCount; i++) { 58 Worker worker = wf.newWorker(); 59 new Thread (worker).start(); 60 } 61 62 if (nodes.increment() == 1) { 63 populateWorkQueue(workSize, testObjectDepth, workQueue); 66 for (int i = 0; i < iterationCount; i++) { 67 synchronized (resultSet) { 68 while (resultSet.size() < workSize) { 69 try { 70 resultSet.wait(); 71 } catch (InterruptedException e) { 72 throw new TCRuntimeException(e); 73 } 74 } 75 } 76 verify(i + 1, resultSet); 77 if (i != (iterationCount - 1)) { 78 synchronized (resultSet) { 79 for (Iterator iter = resultSet.iterator(); iter.hasNext();) { 80 put(workQueue, iter.next()); 81 iter.remove(); 82 } 83 } 84 } 85 } 86 for (int i = 0; i < wf.getGlobalWorkerCount(); i++) { 87 put(workQueue, "STOP"); 88 } 89 } 90 } catch (Exception e) { 91 throw new TCRuntimeException(e); 92 } 93 } 94 95 private void populateWorkQueue(int size, int depth, List queue) { 96 System.err.println(" Thread - " + Thread.currentThread().getName() + " inside populateWorkQueue !"); 97 for (int i = 0; i < size; i++) { 98 TestObject to = new TestObject("" + i); 99 to.populate(depth); 100 put(queue, to); 101 } 102 } 103 104 private void verify(int expectedValue, Collection results) { 105 synchronized (results) { 106 Assert.assertEquals(workSize, results.size()); 107 for (Iterator i = results.iterator(); i.hasNext();) { 108 TestObject to = (TestObject) i.next(); 109 if (!to.validate(expectedValue)) { throw new RuntimeException ("Failed!"); } 110 } 111 } 112 } 113 114 public final void println(Object o) { 115 try { 116 out.println(o); 117 } catch (InterruptedException e) { 118 throw new TCRuntimeException(e); 119 } 120 } 121 122 public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) { 123 visitL1DSOConfig(visitor, config, new HashMap ()); 124 } 125 126 public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config, Map optionalAttributes) { 127 DebugUtil.DEBUG = true; 128 129 boolean isSynchronousWrite = false; 130 if (optionalAttributes.size() > 0) { 131 isSynchronousWrite = Boolean.valueOf((String ) optionalAttributes.get(ObjectDataRestartTestApp.SYNCHRONOUS_WRITE)) 132 .booleanValue(); 133 } 134 135 visitor.visit(config, Barriers.class); 136 137 String testClassName = ObjectDataRestartTestApp.class.getName(); 138 TransparencyClassSpec spec = config.getOrCreateSpec(testClassName); 139 140 String idProviderClassname = IDProvider.class.getName(); 141 config.addIncludePattern(idProviderClassname); 142 143 String linkedQueueClassname = LinkedQueue.class.getName(); 144 config.addIncludePattern(linkedQueueClassname); 145 146 String linkedNodeClassname = LinkedNode.class.getName(); 147 config.addIncludePattern(linkedNodeClassname); 148 155 String testObjectClassname = TestObject.class.getName(); 156 config.addIncludePattern(testObjectClassname); 157 158 String workerClassname = Worker.class.getName(); 159 config.addIncludePattern(workerClassname); 160 161 spec.addRoot("workQueue", testClassName + ".workQueue"); 163 spec.addRoot("resultSet", testClassName + ".resultSet"); 164 spec.addRoot("complete", testClassName + ".complete"); 165 spec.addRoot("nodes", testClassName + ".nodes"); 166 167 String workerFactoryClassname = WorkerFactory.class.getName(); 168 config.addIncludePattern(workerFactoryClassname); 169 TransparencyClassSpec workerFactorySpec = config.getOrCreateSpec(workerFactoryClassname); 170 workerFactorySpec.addRoot("globalWorkerCount", workerFactoryClassname + ".globalWorkerCount"); 171 172 String verifyExpression = "* " + testClassName + ".verify(..)"; 174 addWriteAutolock(config, isSynchronousWrite, verifyExpression); 175 176 String runExpression = "* " + testClassName + ".run(..)"; 177 addWriteAutolock(config, isSynchronousWrite, runExpression); 178 179 String populateWorkQueueExpression = "* " + testClassName + ".populateWorkQueue(..)"; 180 addWriteAutolock(config, isSynchronousWrite, populateWorkQueueExpression); 181 182 String putExpression = "* " + testClassName + ".put(..)"; 183 addWriteAutolock(config, isSynchronousWrite, putExpression); 184 185 String takeExpression = "* " + testClassName + ".take(..)"; 186 addWriteAutolock(config, isSynchronousWrite, takeExpression); 187 188 String incrementExpression = "* " + testObjectClassname + ".increment(..)"; 190 addWriteAutolock(config, isSynchronousWrite, incrementExpression); 191 192 String populateExpression = "* " + testObjectClassname + ".populate(..)"; 193 addWriteAutolock(config, isSynchronousWrite, populateExpression); 194 195 String validateExpression = "* " + testObjectClassname + ".validate(..)"; 196 config.addReadAutolock(validateExpression); 197 198 String workerFactoryExpression = "* " + workerFactoryClassname + ".*(..)"; 200 addWriteAutolock(config, isSynchronousWrite, workerFactoryExpression); 201 202 String workerRunExpression = "* " + workerClassname + ".run(..)"; 204 addWriteAutolock(config, isSynchronousWrite, workerRunExpression); 205 206 new SynchronizedIntSpec().visit(visitor, config); 207 208 String nextIDExpression = "* " + idProviderClassname + ".nextID(..)"; 210 addWriteAutolock(config, isSynchronousWrite, nextIDExpression); 211 212 DebugUtil.DEBUG = false; 213 } 214 215 private static void addWriteAutolock(DSOClientConfigHelper config, boolean isSynchronousWrite, String methodPattern) { 216 if (isSynchronousWrite) { 217 config.addSynchronousWriteAutolock(methodPattern); 218 debugPrintln("***** doing a synchronous write"); 219 } else { 220 config.addWriteAutolock(methodPattern); 221 } 222 } 223 224 private static void debugPrintln(String s) { 225 if (DebugUtil.DEBUG) { 226 System.err.println(s); 227 } 228 } 229 230 public static final class WorkerFactory { 231 private int localWorkerCount = 0; 232 private final SynchronizedInt globalWorkerCount; private final List workQueue; 234 private final Collection results; 235 private final Collection localWorkers = new HashSet (); 236 private final SynchronizedInt complete; 237 private final String appId; 238 239 public WorkerFactory(String appId, List workQueue, Collection results, SynchronizedInt complete) { 240 this.appId = appId; 241 this.workQueue = workQueue; 242 this.results = results; 243 this.complete = complete; 244 this.globalWorkerCount = new SynchronizedInt(0); 245 } 246 247 public Worker newWorker() { 248 localWorkerCount++; 249 int globalWorkerID = globalWorkerCount.increment(); 250 Worker rv = new Worker("(" + appId + ") : Worker " + globalWorkerID + "," + localWorkerCount, this.workQueue, 252 this.results, this.complete); 253 this.localWorkers.add(rv); 254 return rv; 255 } 256 257 public int getGlobalWorkerCount() { 258 return globalWorkerCount.get(); 259 } 260 } 261 262 protected static final class Worker implements Runnable { 263 264 private final String name; 265 private final List workQueue; 266 private final Collection results; 267 private final SynchronizedInt workCompletedCount = new SynchronizedInt(0); 268 private final SynchronizedInt objectChangeCount = new SynchronizedInt(0); 269 270 public Worker(String name, List workQueue, Collection results, SynchronizedInt complete) { 271 this.name = name; 272 this.workQueue = workQueue; 273 this.results = results; 274 } 275 276 public void run() { 277 Thread.currentThread().setName(name); 278 try { 279 while (true) { 280 TestObject to; 281 Object o = take(workQueue); 282 if (o instanceof TestObject) { 283 to = (TestObject) o; 284 System.err.println(name + " : Got : " + to); 285 } else if ("STOP".equals(o)) { 286 return; 287 } else { 288 throw new RuntimeException ("Unexpected task: " + o); 289 } 290 objectChangeCount.add(to.increment()); 291 synchronized (results) { 292 results.add(to); 293 results.notifyAll(); 294 } 295 workCompletedCount.increment(); 296 } 297 } catch (Exception e) { 298 throw new TCRuntimeException(e); 299 } 300 } 301 302 public int getWorkCompletedCount() { 303 return this.workCompletedCount.get(); 304 } 305 306 public int getObjectChangeCount() { 307 return this.objectChangeCount.get(); 308 } 309 } 310 311 public static final class Barriers { 312 private final Map barriers; 313 private final int nodeCount; 314 315 public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) { 316 String classname = Barriers.class.getName(); 317 TransparencyClassSpec spec = config.getOrCreateSpec(classname); 318 spec.addRoot("barriers", classname + ".barriers"); 319 String barriersExpression = "* " + classname + ".*(..)"; 320 config.addWriteAutolock(barriersExpression); 321 322 String cyclicBarrierClassname = CyclicBarrier.class.getName(); 323 config.addIncludePattern(cyclicBarrierClassname); 324 325 String cyclicBarrierExpression = "* " + cyclicBarrierClassname + ".*(..)"; 327 config.addWriteAutolock(cyclicBarrierExpression); 328 } 329 330 public Barriers(int nodeCount) { 331 this.barriers = new HashMap (); 332 this.nodeCount = nodeCount; 333 } 334 335 public int barrier(int barrierID) { 336 try { 337 return getOrCreateBarrier(barrierID).barrier(); 338 } catch (InterruptedException e) { 339 throw new TCRuntimeException(e); 340 } 341 } 342 343 private CyclicBarrier getOrCreateBarrier(int barrierID) { 344 synchronized (barriers) { 345 Integer key = new Integer (barrierID); 346 CyclicBarrier rv = (CyclicBarrier) barriers.get(key); 347 if (rv == null) { 348 rv = new CyclicBarrier(this.nodeCount); 349 this.barriers.put(key, rv); 350 } 351 return rv; 352 } 353 } 354 355 } 356 357 protected static final class TestObject { 358 private TestObject child; 359 private int counter; 360 private List activity = new ArrayList (); 361 private String id; 362 363 public TestObject(String id) { 364 this.id = id; 365 } 366 367 private synchronized void addActivity(Object msg) { 368 activity.add(msg + "\n"); 369 } 370 371 public void populate(int count) { 372 TestObject to = this; 373 for (int i = 0; i < count; i++) { 374 synchronized (to) { 375 addActivity(this + ": Populated : (i,count) = (" + i + "," + count + ") @ " + new Date () + " by thread " 376 + Thread.currentThread().getName()); 377 to.child = new TestObject(id + "," + i); 378 } 379 to = to.child; 380 } 381 } 382 383 public int increment() { 384 TestObject to = this; 385 int currentValue = Integer.MIN_VALUE; 386 int changeCounter = 0; 387 do { 388 synchronized (to) { 389 if (currentValue == Integer.MIN_VALUE) { 392 currentValue = to.counter; 393 } 394 if (currentValue != to.counter) { throw new RuntimeException ("Expected current value=" + currentValue 395 + ", actual current value=" + to.counter); } 396 to.addActivity(this + ": increment <inside loop> : old value=" + to.counter + ", thread=" 397 + Thread.currentThread().getName() + " - " + to.counter + " @ " + new Date ()); 398 to.counter++; 399 changeCounter++; 400 } 401 } while ((to = to.getChild()) != null); 402 return changeCounter; 403 } 404 405 public boolean validate(int expectedValue) { 406 TestObject to = this; 407 do { 408 synchronized (to) { 411 if (to.counter != expectedValue) { 412 System.err.println("Expected " + expectedValue + " but found: " + to.counter + " on Test Object : " + to); 413 System.err.println(" To Activities = " + to.activity); 414 System.err.println(" This Activities = " + activity); 415 return false; 416 } 417 } 418 } while ((to = to.getChild()) != null); 419 return true; 420 } 421 422 private synchronized TestObject getChild() { 423 return child; 424 } 425 426 public String toString() { 427 return "TestObject@" + System.identityHashCode(this) + "(" + id + ")={ counter = " + counter + " }"; 428 } 429 } 430 431 protected static final class IDProvider { 432 private int current; 433 434 public synchronized Integer nextID() { 435 int rv = current++; 436 return new Integer (rv); 438 } 439 440 public synchronized Integer getCurrentID() { 441 return new Integer (current); 442 } 443 } 444 445 private static Object take(List workQueue2) { 446 synchronized (workQueue2) { 447 while (workQueue2.size() == 0) { 448 try { 449 System.err.println(Thread.currentThread().getName() + " : Going to sleep : Size = " + workQueue2.size()); 450 workQueue2.wait(); 451 System.err.println(Thread.currentThread().getName() + " : Waking from sleep : Size = " + workQueue2.size()); 452 } catch (InterruptedException e) { 453 throw new RuntimeException (e); 454 } 455 } 456 return workQueue2.remove(0); 457 } 458 } 459 460 private static void put(List workQueue2, Object o) { 461 synchronized (workQueue2) { 462 workQueue2.add(o); 463 workQueue2.notify(); 464 System.err.println(Thread.currentThread().getName() + " : notifying All : Size = " + workQueue2.size()); 465 } 466 } 467 } 468 | Popular Tags |