1 4 package com.tctest; 5 6 import com.tc.logging.TCLogger; 7 import com.tc.logging.TCLogging; 8 import com.tc.object.config.ConfigVisitor; 9 import com.tc.object.config.DSOClientConfigHelper; 10 import com.tc.simulator.app.ApplicationConfig; 11 import com.tc.simulator.listener.ListenerProvider; 12 import com.tctest.runner.AbstractTransparentApp; 13 14 import java.util.ArrayList ; 15 import java.util.Arrays ; 16 import java.util.HashSet ; 17 import java.util.Iterator ; 18 import java.util.LinkedList ; 19 import java.util.List ; 20 import java.util.Random ; 21 import java.util.Set ; 22 23 public class WaitNotifySystemTestApp extends AbstractTransparentApp { 24 private static final TCLogger logger = TCLogging.getTestingLogger(WaitNotifySystemTestApp.class); 25 26 private final List queue = new LinkedList (); 28 private final Set takers = new HashSet (); 29 private final Set putters = new HashSet (); 30 private final Set workers = new HashSet (); 31 private final List takeCounts = new LinkedList (); 32 private final Flag first = new Flag(); 33 34 private static final int PUTS = 50; 35 private static final boolean debug = true; 36 private Random random; 37 38 public WaitNotifySystemTestApp(String globalId, ApplicationConfig cfg, ListenerProvider listenerProvider) { 39 super(globalId, cfg, listenerProvider); 40 41 if (getParticipantCount() < 3) { throw new RuntimeException ("Must have at least 3 participants to run this test"); } 42 43 } 44 45 public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) { 46 String testClassName = WaitNotifySystemTestApp.class.getName(); 47 48 List roots = Arrays.asList(new Object [] { "queue", "takers", "putters", "workers", "takeCounts", "first" }); 49 for (Iterator iter = roots.iterator(); iter.hasNext();) { 50 String root = (String ) iter.next(); 51 config.addRoot(testClassName, root, root + "Lock", true); 52 System.err.println("Adding root for " + testClassName + "." + root); 53 } 54 55 String methodExpression = "* " + testClassName + "*.*(..)"; 56 System.err.println("Adding autolock for: " + methodExpression); 57 config.addWriteAutolock(methodExpression); 58 59 config.addIncludePattern(Flag.class.getName()); 60 config.addIncludePattern(WorkItem.class.getName()); 61 } 62 63 public void run() { 64 random = new Random (new Random (System.currentTimeMillis() + getApplicationId().hashCode()).nextLong()); 65 66 try { 67 run0(); 68 notifyResult(Boolean.TRUE); 69 } catch (Throwable t) { 70 notifyError(t); 71 notifyResult(Boolean.FALSE); 72 } 73 } 74 75 public void run0() throws Throwable { 76 final long id = new Long (getApplicationId()).longValue(); 77 78 if (first.attemptSet()) { 79 runMaster(getParticipantCount() - 1); 81 } else { 82 if (random.nextBoolean()) { 83 runPutter(id); 84 } else { 85 runTaker(id); 86 } 87 } 88 } 89 90 private void runTaker(long id) throws InterruptedException { 91 Thread.currentThread().setName("TAKER-" + id); 92 93 Long myID = new Long (id); 94 synchronized (takers) { 95 takers.add(myID); 96 } 97 98 synchronized (workers) { 99 workers.add(myID); 100 workers.notify(); 101 } 102 103 log("STARTED"); 104 105 long count = 0; 106 try { 107 synchronized (queue) { 108 while (true) { 109 if (queue.size() > 0) { 110 WorkItem wi = (WorkItem) queue.remove(0); 111 if (wi.isStop()) { 113 log("took a stop item"); 114 return; 115 } 116 count++; 117 } else { 118 if (random.nextBoolean()) { 119 long millis = random.nextInt(10000); 120 if (random.nextBoolean()) { 121 int nanos = random.nextInt(10000); 122 queue.wait(millis, nanos); 124 } else { 125 queue.wait(millis); 127 } 128 } else { 129 queue.wait(); 131 } 132 } 133 } 134 } 135 136 } finally { 137 log("adding to takeCount"); 138 synchronized (takeCounts) { 139 takeCounts.add(new Long (count)); 140 } 141 142 log("removing self from takers set"); 143 synchronized (takers) { 144 takers.remove(myID); 145 takers.notify(); 146 } 147 148 log("ENDED"); 149 } 150 } 151 152 private void runPutter(long id) { 153 Thread.currentThread().setName("PUTTER-" + id); 154 155 Long myID = new Long (id); 156 synchronized (putters) { 157 putters.add(myID); 158 } 159 160 synchronized (workers) { 161 workers.add(myID); 162 workers.notify(); 163 } 164 165 log("STARTED"); 166 167 try { 168 for (int i = 0; i < PUTS; i++) { 169 synchronized (queue) { 170 WorkItem newWork = new WorkItem(myID.toString() + "-" + i); 171 172 174 queue.add(newWork); 175 176 if (random.nextBoolean()) { 177 queue.notifyAll(); 179 } else { 180 queue.notify(); 182 } 183 } 184 } 185 } finally { 186 log("removing self from putters set"); 187 synchronized (putters) { 188 putters.remove(myID); 189 putters.notify(); 190 } 191 192 log("ENDED"); 193 } 194 } 195 196 private void runMaster(int workerCount) throws InterruptedException { 197 Thread.currentThread().setName("MASTER"); 198 199 waitForAllWorkers(workerCount); 200 201 log("All worker nodes started"); 202 203 final Long workerIDs[]; 204 synchronized (workers) { 205 workerIDs = (Long []) workers.toArray(new Long [] {}); 206 } 207 208 final long extraTakerID = getUniqueId(workerIDs); 209 final List next = new ArrayList (workers); 210 next.add(new Long (extraTakerID)); 211 final long extraPutterID = getUniqueId((Long []) next.toArray(new Long [] {})); 212 213 Thread extraTaker = new Thread ( new Runnable () { 217 public void run() { 218 try { 219 runTaker(extraTakerID); 220 } catch (Throwable t) { 221 WaitNotifySystemTestApp.this.notifyError(t); 222 } 223 } 224 }); 225 extraTaker.start(); 226 227 Thread extraPutter = new Thread (new Runnable () { 228 public void run() { 229 try { 230 runPutter(extraPutterID); 231 } catch (Throwable t) { 232 WaitNotifySystemTestApp.this.notifyError(t); 233 } 234 } 235 }); 236 extraPutter.start(); 237 238 workerCount += 2; 239 waitForAllWorkers(workerCount); 240 241 log("Extra workers started"); 242 243 final int numTakers; 244 synchronized (takers) { 245 numTakers = takers.size(); 246 } 247 log("takers count = " + numTakers); 248 249 final int numPutters = workerCount - numTakers; 250 log("putters count = " + numPutters); 251 252 synchronized (putters) { 254 while (putters.size() > 0) { 255 log("waiting for putters: " + putters.size()); 256 putters.wait(); 257 } 258 } 259 260 log("All putters done"); 261 262 synchronized (queue) { 264 for (int i = 0; i < numTakers; i++) { 265 queue.add(WorkItem.STOP); 266 } 267 queue.notifyAll(); 268 } 269 270 log("Takers told to stop"); 271 272 synchronized (takers) { 274 while (takers.size() > 0) { 275 log("waiting for takers: " + takers.size()); 276 takers.wait(); 277 } 278 } 279 280 log("Takers all done"); 281 282 long total = 0; 284 synchronized (takeCounts) { 285 log("Collecting take counts"); 286 287 if (takeCounts.size() != numTakers) { 288 throw new RuntimeException ("Wrong number of take counts: " + takeCounts.size() + " != " + numTakers); 290 } 291 292 for (Iterator iter = takeCounts.iterator(); iter.hasNext();) { 293 Long count = (Long ) iter.next(); 294 total += count.longValue(); 295 } 296 } 297 298 final int expectedTotal = numPutters * PUTS; 300 if (total != expectedTotal) { throw new RuntimeException ("Expected " + expectedTotal + ", but we got " + total); } 301 } 302 303 private void waitForAllWorkers(int workerCount) throws InterruptedException { 304 synchronized (workers) { 306 while (workers.size() < workerCount) { 307 final int lastCount = workers.size(); 308 log("waiting for workers " + workers.size()); 309 workers.wait(); 310 if (lastCount == workers.size()) { throw new Error ("Size didn't change!!!"); } 311 } 312 } 313 } 314 315 private long getUniqueId(Long [] workerIDs) { 316 while (true) { 317 final long candidate = random.nextInt(Integer.MAX_VALUE); 318 boolean okay = true; 319 for (int i = 0; i < workerIDs.length; i++) { 320 if (workerIDs[i].longValue() == candidate) { 321 okay = false; 322 break; 323 } 324 325 if (okay) { return candidate; } 326 } 327 } 328 } 329 330 private static void log(String msg) { 331 if (debug) logger.info(msg); 332 } 333 334 private static class Flag { 335 private boolean set = false; 336 337 synchronized boolean attemptSet() { 338 if (!set) { 339 set = true; 340 return true; 341 } 342 return false; 343 } 344 } 345 346 private static class WorkItem { 347 static final WorkItem STOP = new WorkItem("STOP"); 348 349 private final String name; 350 351 WorkItem(String name) { 352 this.name = name; 353 } 354 355 boolean isStop() { 356 return STOP.name.equals(name); 357 } 358 359 public String toString() { 360 return this.name; 361 } 362 } 363 364 }
| Popular Tags
|