1 22 package org.jboss.aspects.txlock; 23 24 import org.jboss.logging.Logger; 25 import org.jboss.util.deadlock.DeadlockDetector; 26 import org.jboss.util.deadlock.Resource; 27 28 import javax.transaction.Status ; 29 import javax.transaction.Synchronization ; 30 import javax.transaction.Transaction ; 31 import java.util.HashMap ; 32 import java.util.LinkedList ; 33 34 53 public class QueuedTxLock implements Resource 54 { 55 public static final String TXLOCK = "TxLock"; 56 public static final String TIMEOUT = "timeout"; 57 58 private HashMap txLocks = new HashMap (); 59 private LinkedList txWaitQueue = new LinkedList (); 60 61 62 private Transaction tx = null; 63 private boolean synched = false; 64 private boolean isSynchronized = false; 65 private Logger log = Logger.getLogger(this.getClass()); 66 67 public Object getResourceHolder() 68 { 69 return tx; 70 } 71 72 public void sync() throws InterruptedException 73 { 74 synchronized (this) 75 { 76 while (synched) 77 { 78 this.wait(); 79 } 80 synched = true; 81 } 82 } 83 84 public void releaseSync() 85 { 86 synchronized (this) 87 { 88 synched = false; 89 this.notify(); 90 } 91 } 92 93 97 public void setTransaction(Transaction tx) 98 { 99 this.tx = tx; 100 } 101 102 public Transaction getTransaction() 103 { 104 return tx; 105 } 106 107 108 private class TxLock 109 { 110 111 public Transaction waitingTx; 112 public String threadName; 113 public boolean isQueued; 114 115 public TxLock(Transaction trans) 116 { 117 this.threadName = Thread.currentThread().toString(); 118 this.waitingTx = trans; 119 this.isQueued = true; 120 } 121 122 public boolean equals(Object obj) 123 { 124 if (obj == this) return true; 125 126 TxLock lock = (TxLock) obj; 127 128 return lock.waitingTx.equals(this.waitingTx); 129 } 130 131 public int hashCode() 132 { 133 return waitingTx.hashCode(); 134 } 135 136 public String toString() 137 { 138 StringBuffer buffer = new StringBuffer (100); 139 buffer.append("TXLOCK waitingTx=").append(waitingTx); 140 buffer.append(" thread=").append(threadName); 141 buffer.append(" queued=").append(isQueued); 142 return buffer.toString(); 143 } 144 } 145 146 protected TxLock getTxLock(Transaction miTx) 147 { 148 TxLock lock = null; 149 TxLock key = new TxLock(miTx); 150 lock = (TxLock) txLocks.get(key); 151 if (lock == null) 152 { 153 txLocks.put(key, key); 154 txWaitQueue.addLast(key); 155 lock = key; 156 } 157 return lock; 158 } 159 160 protected boolean isTxExpired(Transaction miTx) throws Exception 161 { 162 if (miTx != null && miTx.getStatus() == Status.STATUS_MARKED_ROLLBACK) 163 { 164 return true; 165 } 166 return false; 167 } 168 169 170 public boolean lockNoWait(Transaction transaction) throws Exception 171 { 172 this.sync(); 173 if (log.isTraceEnabled()) 174 log.trace("lockNoWait tx=" + transaction + " " + toString()); 175 try 176 { 177 if (getTransaction() != null && !getTransaction().equals(transaction)) 179 { 180 return false; 181 } 182 setTransaction(transaction); 183 return true; 184 } 185 finally 186 { 187 this.releaseSync(); 188 } 189 } 190 191 201 public void schedule(Transaction miTx, org.jboss.aop.joinpoint.Invocation mi) 202 throws Exception 203 { 204 boolean trace = log.isTraceEnabled(); 205 this.sync(); 206 try 207 { 208 if (trace) log.trace("Begin schedule"); 209 210 if (isTxExpired(miTx)) 211 { 212 log.error("Saw rolled back tx=" + miTx); 213 throw new RuntimeException ("Transaction marked for rollback, possibly a timeout"); 214 } 215 216 waitForTx(mi, miTx, trace); 219 if (!isSynchronized) 220 { 221 isSynchronized = true; 222 miTx.registerSynchronization(new TxLockSynchronization()); 223 } 224 } 225 finally 226 { 227 this.releaseSync(); 228 } 229 230 } 231 232 237 protected void waitForTx(org.jboss.aop.joinpoint.Invocation mi, Transaction miTx, boolean trace) throws Exception 238 { 239 boolean wasScheduled = false; 240 TxLock txLock = null; 245 while (getTransaction() != null && 246 !getTransaction().equals(miTx)) 248 { 249 try 251 { 252 DeadlockDetector.singleton.deadlockDetection(miTx, this); 253 } 254 catch (Exception e) 255 { 256 if (txLock != null && txLock.isQueued) 258 { 259 txLocks.remove(txLock); 260 txWaitQueue.remove(txLock); 261 } 262 throw e; 263 } 264 265 wasScheduled = true; 266 if (trace) log.trace("Transactional contention on context miTx=" + miTx + " " + toString()); 269 270 if (txLock == null) 271 txLock = getTxLock(miTx); 272 273 if (trace) log.trace("Begin wait on " + txLock + " " + toString()); 274 275 synchronized (txLock) 277 { 278 releaseSync(); 279 try 280 { 281 int txTimeout = 0; 282 Integer timeout = (Integer ) mi.getMetaData(TXLOCK, TIMEOUT); 283 if (timeout != null) txTimeout = timeout.intValue(); 284 txLock.wait(txTimeout); 285 } 286 catch (InterruptedException ignored) 287 { 288 } 289 } 291 this.sync(); 292 293 if (trace) log.trace("End wait on " + txLock + " " + toString()); 294 if (isTxExpired(miTx)) 295 { 296 log.error(Thread.currentThread() + "Saw rolled back tx=" + miTx + " waiting for txLock"); 297 if (txLock.isQueued) 298 { 299 txLocks.remove(txLock); 303 txWaitQueue.remove(txLock); 304 } 305 else if (getTransaction() != null && getTransaction().equals(miTx)) 306 { 307 nextTransaction(trace); 309 } 310 if (miTx != null) 311 { 312 DeadlockDetector.singleton.removeWaiting(miTx); 313 } 314 throw new RuntimeException ("Transaction marked for rollback, possibly a timeout"); 315 } 316 } 318 if (!wasScheduled) setTransaction(miTx); 320 return; 321 } 322 323 331 protected void nextTransaction(boolean trace) 332 { 333 if (!synched) 334 { 335 throw new IllegalStateException ("do not call nextTransaction while not synched!"); 336 } 337 338 setTransaction(null); 339 TxLock thelock = null; 341 if (!txWaitQueue.isEmpty()) 342 { 343 thelock = (TxLock) txWaitQueue.removeFirst(); 344 txLocks.remove(thelock); 345 thelock.isQueued = false; 346 if (thelock.waitingTx != null) 349 DeadlockDetector.singleton.removeWaiting(thelock.waitingTx); 350 setTransaction(thelock.waitingTx); 351 synchronized (thelock) 352 { 353 thelock.notifyAll(); 356 } 357 } 358 if (trace) 359 log.trace("nextTransaction: " + thelock + " " + toString()); 360 } 361 362 public void endTransaction() 363 { 364 boolean trace = log.isTraceEnabled(); 365 if (trace) 366 log.trace("endTransaction: " + toString()); 367 nextTransaction(trace); 368 } 369 370 public void endInvocation(Transaction thetx) 371 { 372 if (log.isTraceEnabled()) 373 log.trace("endInvocation: miTx=" + thetx + " " + toString()); 374 } 375 376 public String toString() 377 { 378 StringBuffer buffer = new StringBuffer (100); 379 buffer.append(" hash=").append(hashCode()); 380 buffer.append(" tx=").append(getTransaction()); 381 buffer.append(" synched=").append(synched); 382 buffer.append(" queue=").append(txWaitQueue); 383 return buffer.toString(); 384 } 385 386 private final class TxLockSynchronization implements Synchronization 387 { 388 public void beforeCompletion() 389 { 390 } 391 392 public void afterCompletion(int status) 393 { 394 try 395 { 396 sync(); 397 } 398 catch (InterruptedException ignored) 399 { 400 } 401 isSynchronized = false; 402 endTransaction(); 403 releaseSync(); 404 } 405 } 406 } 407 408 | Popular Tags |