1 4 package com.tc.test.transactions; 5 6 import org.apache.commons.lang.builder.EqualsBuilder; 7 8 import com.tc.util.Assert; 9 import com.tc.util.stringification.OurStringBuilder; 10 11 import java.util.HashSet ; 12 import java.util.Iterator ; 13 import java.util.Set ; 14 15 42 public class StandardTransactionalObject implements TransactionalObject { 43 44 private static interface Timing { 45 boolean isAfter(Timing other); 46 47 boolean isBefore(Timing other); 48 49 boolean isAfterOrEqual(Timing other); 50 51 boolean isBeforeOrEqual(Timing other); 52 53 long rawTime(); 54 } 55 56 private static class SequencedTiming implements Timing { 57 private final long actualTime; 58 private final long sequenceNumber; 59 60 public SequencedTiming(long actualTime, long sequenceNumber) { 61 this.actualTime = actualTime; 62 this.sequenceNumber = sequenceNumber; 63 } 64 65 public boolean isAfter(Timing rawOther) { 66 SequencedTiming other = (SequencedTiming) rawOther; 67 if (other == null) return true; 68 return this.actualTime > other.actualTime 69 || (this.actualTime == other.actualTime && this.sequenceNumber > other.sequenceNumber); 70 } 71 72 public boolean isBefore(Timing rawOther) { 73 SequencedTiming other = (SequencedTiming) rawOther; 74 if (other == null) return false; 75 return this.actualTime < other.actualTime 76 || (this.actualTime == other.actualTime && this.sequenceNumber < other.sequenceNumber); 77 } 78 79 public boolean isAfterOrEqual(Timing other) { 80 return isAfter(other) || equals(other); 81 } 82 83 public boolean isBeforeOrEqual(Timing other) { 84 return isBefore(other) || equals(other); 85 } 86 87 public boolean equals(Object that) { 88 if (!(that instanceof SequencedTiming)) return false; 89 SequencedTiming thatTiming = (SequencedTiming) that; 90 return new EqualsBuilder().append(this.actualTime, thatTiming.actualTime).append(this.sequenceNumber, 91 thatTiming.sequenceNumber) 92 .isEquals(); 93 } 94 95 public long rawTime() { 96 return this.actualTime; 97 } 98 99 public String toString() { 100 return new OurStringBuilder(this, OurStringBuilder.COMPACT_STYLE).append("actual time", this.actualTime) 101 .append("sequence number", this.sequenceNumber).toString(); 102 } 103 } 104 105 private static class BasicTiming implements Timing { 106 private final long time; 107 108 public BasicTiming(long time) { 109 this.time = time; 110 } 111 112 public boolean isAfter(Timing rawOther) { 113 if (rawOther == null) return true; 114 BasicTiming other = (BasicTiming) rawOther; 115 return this.time > other.time; 116 } 117 118 public boolean isBefore(Timing rawOther) { 119 if (rawOther == null) return false; 120 BasicTiming other = (BasicTiming) rawOther; 121 return this.time < other.time; 122 } 123 124 public boolean isAfterOrEqual(Timing other) { 125 return isAfter(other) || equals(other); 126 } 127 128 public boolean isBeforeOrEqual(Timing other) { 129 return isBefore(other) || equals(other); 130 } 131 132 public boolean equals(Object that) { 133 if (!(that instanceof BasicTiming)) return false; 134 BasicTiming thatTiming = (BasicTiming) that; 135 return new EqualsBuilder().append(this.time, thatTiming.time).isEquals(); 136 } 137 138 public long rawTime() { 139 return this.time; 140 } 141 142 public String toString() { 143 return new OurStringBuilder(this, OurStringBuilder.COMPACT_STYLE).append("time", this.time).toString(); 144 } 145 } 146 147 private static class Write implements Context { 148 private final Object value; 149 private final Timing startedAt; 150 private Timing committedAt; 151 152 public Write(Object value, Timing startedAt) { 153 Assert.assertNotNull(startedAt); 154 this.value = value; 155 this.startedAt = startedAt; 156 this.committedAt = null; 157 } 158 159 public void commit(Timing now) { 160 Assert.eval(this.committedAt == null); 161 this.committedAt = now; 162 } 163 164 public Timing startedAt() { 165 return this.startedAt; 166 } 167 168 public Timing committedAt() { 169 return this.committedAt; 170 } 171 172 public boolean isCommitted() { 173 return this.committedAt != null; 174 } 175 176 public Object value() { 177 return this.value; 178 } 179 180 public String toString() { 181 return new OurStringBuilder(this, OurStringBuilder.COMPACT_STYLE).append("value", value).append("started at", 182 startedAt) 183 .append("committed at", committedAt).toString(); 184 } 185 } 186 187 private static class Read implements Context { 188 private final Timing startedAt; 189 190 public Read(Timing startedAt) { 191 Assert.eval(startedAt != null); 192 this.startedAt = startedAt; 193 } 194 195 public Timing startedAt() { 196 return this.startedAt; 197 } 198 199 public String toString() { 200 return new OurStringBuilder(this, OurStringBuilder.COMPACT_STYLE).append("started at", startedAt).toString(); 201 } 202 } 203 204 private static final long DEFAULT_GC_SLOP = 60 * 1000; 205 206 private final String name; 207 private final long slop; 208 private final long gcSlop; 209 private final Set currentWrites; 210 private final Set currentReads; 211 private long lastSequence; 212 213 public StandardTransactionalObject(String name, long slop, long gcSlop, Object initialValue, long now) { 214 Assert.assertNotBlank(name); 215 Assert.eval(slop >= 0); 216 Assert.eval(gcSlop >= 0); 217 this.name = name; 218 this.slop = slop; 219 this.gcSlop = gcSlop; 220 this.currentWrites = new HashSet (); 221 this.currentReads = new HashSet (); 222 this.lastSequence = 0; 223 endWrite(startWrite(initialValue, now), now); 224 } 225 226 public StandardTransactionalObject(String name, long slop, Object initialValue, long now) { 227 this(name, slop, DEFAULT_GC_SLOP, initialValue, now); 228 } 229 230 public StandardTransactionalObject(String name, long slop, Object initialValue) { 231 this(name, slop, initialValue, System.currentTimeMillis()); 232 } 233 234 public StandardTransactionalObject(String name, Object initialValue, long now) { 235 this(name, 0, initialValue, now); 236 } 237 238 public StandardTransactionalObject(String name, Object initialValue) { 239 this(name, 0, initialValue, System.currentTimeMillis()); 240 } 241 242 private Timing createTiming(long time) { 243 return createTiming(time, 0); 244 } 245 246 private Timing createTiming(long time, long offset) { 247 if (this.slop == 0) return new SequencedTiming(time + offset, nextSequence()); 248 else return new BasicTiming(time + offset); 249 } 250 251 private Timing createTiming(Timing time, long offset) { 252 if (this.slop == 0) return new SequencedTiming(((SequencedTiming) time).actualTime + offset, nextSequence()); 253 else return new BasicTiming(((BasicTiming) time).time + offset); 254 } 255 256 private synchronized long nextSequence() { 257 return ++this.lastSequence; 258 } 259 260 public synchronized Context startWrite(Object value) { 261 return startWrite(value, System.currentTimeMillis()); 262 } 263 264 public synchronized Context startWrite(Object value, long now) { 265 Assert.eval(now >= 0); 266 267 gc(now); 268 269 Write out = new Write(value, createTiming(now)); 270 this.currentWrites.add(out); 271 return out; 272 } 273 274 public synchronized void endWrite(Context rawWrite) { 275 endWrite(rawWrite, System.currentTimeMillis()); 276 } 277 278 public synchronized void endWrite(Context rawWrite, long now) { 279 Assert.assertNotNull(rawWrite); 280 Assert.eval(now >= 0); 281 282 gc(now); 283 284 Write write = (Write) rawWrite; 285 Assert.eval("You can't commit a write before you started it, buddy.", createTiming(now) 286 .isAfterOrEqual(write.startedAt())); 287 write.commit(createTiming(now)); 288 } 289 290 public synchronized Context startRead() { 291 return startRead(System.currentTimeMillis()); 292 } 293 294 public synchronized Context startRead(long now) { 295 Assert.eval(now >= 0); 296 297 gc(now); 298 299 Read out = new Read(createTiming(now)); 300 this.currentReads.add(out); 301 return out; 302 } 303 304 public synchronized void endRead(Context rawRead, Object result) { 305 endRead(rawRead, result, System.currentTimeMillis()); 306 } 307 308 public synchronized void endRead(Context rawRead, Object result, long now) { 309 Assert.assertNotNull(rawRead); 310 Assert.eval(now >= 0); 311 312 gc(now); 313 314 Read read = (Read) rawRead; 315 316 Set withoutTooOld = removeNotInEffectAsOfTime(this.currentWrites, createTiming(read.startedAt(), -this.slop)); 317 Set potentials = removeTooYoung(withoutTooOld, createTiming(now)); 318 319 322 this.currentReads.remove(read); 323 324 Iterator iter = potentials.iterator(); 325 while (iter.hasNext()) { 326 if (valueEquals(result, ((Write) iter.next()).value())) return; 327 } 328 329 throw Assert.failure("Your read on object " + this + " was incorrect. You read the value " + result 330 + " at some point between " + read.startedAt() + " and " + now 331 + ", but the only writes that were in effect during that period are " + potentials); 332 } 333 334 private synchronized Set removeNotInEffectAsOfTime(Set source, Timing effectiveTime) { 335 Set out = new HashSet (); 336 337 Timing latestStart = null; 339 Iterator sourceIter = source.iterator(); 340 while (sourceIter.hasNext()) { 341 Write sourceWrite = (Write) sourceIter.next(); 342 if (!sourceWrite.isCommitted() || sourceWrite.committedAt().isAfterOrEqual(effectiveTime)) continue; 343 if (latestStart == null || latestStart.isBefore(sourceWrite.startedAt())) latestStart = sourceWrite.startedAt(); 344 } 345 346 sourceIter = source.iterator(); 348 while (sourceIter.hasNext()) { 349 Write sourceWrite = (Write) sourceIter.next(); 350 if (sourceWrite.isCommitted() && sourceWrite.committedAt().isBefore(latestStart)) continue; 351 out.add(sourceWrite); 352 } 353 354 return out; 355 } 356 357 private synchronized Set removeTooYoung(Set source, Timing notAfter) { 358 Set out = new HashSet (); 359 360 Iterator sourceIter = source.iterator(); 361 while (sourceIter.hasNext()) { 362 Write sourceWrite = (Write) sourceIter.next(); 363 if (sourceWrite.startedAt().isAfter(notAfter)) continue; 364 out.add(sourceWrite); 365 } 366 367 return out; 368 } 369 370 private synchronized void gc(long now) { 371 Timing gcTime = createTiming(Math.min(now, earliestReadStart()), -Math.max(this.slop, this.gcSlop)); 372 Set newSet = removeNotInEffectAsOfTime(this.currentWrites, gcTime); 373 374 this.currentWrites.clear(); 375 this.currentWrites.addAll(newSet); 376 } 377 378 private synchronized long earliestReadStart() { 379 long out = Long.MAX_VALUE; 380 Iterator iter = this.currentReads.iterator(); 381 while (iter.hasNext()) { 382 Read read = (Read) iter.next(); 383 out = Math.min(out, read.startedAt.rawTime()); 384 } 385 386 return out; 387 } 388 389 private boolean valueEquals(Object one, Object two) { 390 if ((one == null) != (two == null)) return false; 391 if (one == null) return true; 392 return one.equals(two); 393 } 394 395 public String toString() { 396 return new OurStringBuilder(this, OurStringBuilder.COMPACT_STYLE).append("name", this.name).append("slop", 397 this.slop) 398 .toString(); 399 } 400 401 } 402 | Popular Tags |