1 22 package org.jboss.ejb.plugins.lock; 23 24 import java.util.LinkedList ; 25 import java.util.HashMap ; 26 import java.util.ArrayList ; 27 28 import javax.transaction.Transaction ; 29 import javax.transaction.Status ; 30 31 import org.jboss.invocation.Invocation; 32 import org.jboss.ejb.Container; 33 import org.jboss.ejb.EntityEnterpriseContext; 34 import org.jboss.monitor.LockMonitor; 35 import org.jboss.util.deadlock.DeadlockDetector; 36 37 65 public class QueuedPessimisticEJBLock extends BeanLockSupport 66 { 67 private HashMap txLocks = new HashMap (); 68 private LinkedList txWaitQueue = new LinkedList (); 69 70 private int txIdGen = 0; 71 protected LockMonitor lockMonitor = null; 72 73 protected boolean deadlockDetection = true; 74 75 public void setContainer(Container container) 76 { 77 this.container = container; 78 lockMonitor = container.getLockManager().getLockMonitor(); 79 } 80 81 public boolean getDeadlockDetection() 82 { 83 return deadlockDetection; 84 } 85 public void setDeadlockDetection(boolean flag) 86 { 87 this.deadlockDetection = flag; 88 } 89 90 private class TxLock 91 { 92 93 public Transaction waitingTx = null; 94 public int id = 0; 95 public String threadName; 96 public boolean isQueued; 97 98 102 public Object deadlocker; 103 104 public TxLock(Transaction trans) 105 { 106 this.threadName = Thread.currentThread().toString(); 107 this.waitingTx = trans; 108 if (trans == null) 109 { 110 if (txIdGen < 0) txIdGen = 0; 111 this.id = txIdGen++; 112 deadlocker = Thread.currentThread(); 113 } 114 else 115 { 116 deadlocker = trans; 117 } 118 this.isQueued = true; 119 } 120 121 public boolean equals(Object obj) 122 { 123 if (obj == this) return true; 124 125 TxLock lock = (TxLock) obj; 126 127 if (lock.waitingTx == null && this.waitingTx == null) 128 { 129 return lock.id == this.id; 130 } 131 else if (lock.waitingTx != null && this.waitingTx != null) 132 { 133 return lock.waitingTx.equals(this.waitingTx); 134 } 135 return false; 136 } 137 138 public int hashCode() 139 { 140 return this.id; 141 } 142 143 144 public String toString() 145 { 146 StringBuffer buffer = new StringBuffer (100); 147 buffer.append("TXLOCK waitingTx=").append(waitingTx); 148 buffer.append(" id=").append(id); 149 buffer.append(" thread=").append(threadName); 150 buffer.append(" queued=").append(isQueued); 151 return buffer.toString(); 152 } 153 } 154 155 protected TxLock getTxLock(Transaction miTx) 156 { 157 TxLock lock = null; 158 if (miTx == null) 159 { 160 lock = new TxLock(null); 162 txWaitQueue.addLast(lock); 163 } 164 else 165 { 166 TxLock key = new TxLock(miTx); 167 lock = (TxLock) txLocks.get(key); 168 if (lock == null) 169 { 170 txLocks.put(key, key); 171 txWaitQueue.addLast(key); 172 lock = key; 173 } 174 } 175 return lock; 176 } 177 178 protected boolean isTxExpired(Transaction miTx) throws Exception 179 { 180 if (miTx != null && miTx.getStatus() == Status.STATUS_MARKED_ROLLBACK) 181 { 182 return true; 183 } 184 return false; 185 } 186 187 188 public void schedule(Invocation mi) throws Exception 189 { 190 boolean threadScheduled = false; 191 while (!threadScheduled) 192 { 193 194 threadScheduled = doSchedule(mi); 195 } 196 } 197 198 208 protected boolean doSchedule(Invocation mi) 209 throws Exception 210 { 211 boolean wasThreadScheduled = false; 212 Transaction miTx = mi.getTransaction(); 213 boolean trace = log.isTraceEnabled(); 214 this.sync(); 215 try 216 { 217 if (trace) log.trace("Begin schedule, key=" + mi.getId()); 218 219 if (isTxExpired(miTx)) 220 { 221 log.error("Saw rolled back tx=" + miTx); 222 throw new RuntimeException ("Transaction marked for rollback, possibly a timeout"); 223 } 224 225 long startWait = System.currentTimeMillis(); 228 try 229 { 230 wasThreadScheduled = waitForTx(miTx, trace); 231 if (wasThreadScheduled && lockMonitor != null) 232 { 233 long endWait = System.currentTimeMillis() - startWait; 234 lockMonitor.finishedContending(endWait); 235 } 236 } 237 catch (Exception throwable) 238 { 239 if (lockMonitor != null && isTxExpired(miTx)) 240 { 241 lockMonitor.increaseTimeouts(); 242 } 243 if (lockMonitor != null) 244 { 245 long endWait = System.currentTimeMillis() - startWait; 246 lockMonitor.finishedContending(endWait); 247 } 248 throw throwable; 249 } 250 } 251 finally 252 { 253 if (miTx == null && wasThreadScheduled) 255 { 256 nextTransaction(); 260 } 261 this.releaseSync(); 262 } 263 264 return true; 266 } 267 268 273 protected boolean waitForTx(Transaction miTx, boolean trace) throws Exception 274 { 275 boolean wasScheduled = false; 276 TxLock txLock = null; 281 Object deadlocker = miTx; 282 if (deadlocker == null) deadlocker = Thread.currentThread(); 283 284 while (getTransaction() != null && 285 !getTransaction().equals(miTx)) 287 { 288 try 290 { 291 if( deadlockDetection == true ) 292 DeadlockDetector.singleton.deadlockDetection(deadlocker, this); 293 } 294 catch (Exception e) 295 { 296 if (txLock != null && txLock.isQueued) 298 { 299 txLocks.remove(txLock); 300 txWaitQueue.remove(txLock); 301 } 302 throw e; 303 } 304 305 wasScheduled = true; 306 if (lockMonitor != null) lockMonitor.contending(); 307 if (trace) log.trace("Transactional contention on context" + id); 310 311 if (txLock == null) 313 txLock = getTxLock(miTx); 314 315 if (trace) log.trace("Begin wait on Tx=" + getTransaction()); 316 317 synchronized (txLock) 319 { 320 releaseSync(); 321 try 322 { 323 txLock.wait(txTimeout); 324 } 325 catch (InterruptedException ignored) 326 { 327 } 328 } 330 this.sync(); 331 332 if (trace) log.trace("End wait on TxLock=" + getTransaction()); 333 if (isTxExpired(miTx)) 334 { 335 log.error(Thread.currentThread() + "Saw rolled back tx=" + miTx + " waiting for txLock" 336 ); 339 if (txLock.isQueued) 340 { 341 txLocks.remove(txLock); 345 txWaitQueue.remove(txLock); 346 } 347 else if (getTransaction() != null && getTransaction().equals(miTx)) 348 { 349 nextTransaction(); 351 } 352 if (miTx != null) 353 { 354 if( deadlockDetection == true ) 355 DeadlockDetector.singleton.removeWaiting(deadlocker); 356 } 357 throw new RuntimeException ("Transaction marked for rollback, possibly a timeout"); 358 } 359 } 361 if (!wasScheduled) 363 { 364 setTransaction(miTx); 365 } 366 return wasScheduled; 367 } 368 369 377 protected void nextTransaction() 378 { 379 if (synched == null) 380 { 381 throw new IllegalStateException ("do not call nextTransaction while not synched!"); 382 } 383 384 setTransaction(null); 385 if (!txWaitQueue.isEmpty()) 387 { 388 TxLock thelock = (TxLock) txWaitQueue.removeFirst(); 389 txLocks.remove(thelock); 390 thelock.isQueued = false; 391 setTransaction(thelock.waitingTx); 394 if( deadlockDetection == true ) 396 DeadlockDetector.singleton.removeWaiting(thelock.deadlocker); 397 398 synchronized (thelock) 399 { 400 thelock.notifyAll(); 403 } 404 } 405 else 406 { 407 } 409 } 410 411 public void endTransaction(Transaction transaction) 412 { 413 nextTransaction(); 414 } 415 416 public void wontSynchronize(Transaction trasaction) 417 { 418 nextTransaction(); 419 } 420 421 427 public void endInvocation(Invocation mi) 428 { 429 Transaction tx = mi.getTransaction(); 431 if (tx != null && tx.equals(getTransaction())) 432 { 433 EntityEnterpriseContext ctx = (EntityEnterpriseContext) mi.getEnterpriseContext(); 435 if (ctx == null || ctx.hasTxSynchronization() == false) 436 endTransaction(tx); 437 } 438 } 439 440 public void removeRef() 441 { 442 refs--; 443 if (refs == 0 && txWaitQueue.size() > 0) 444 { 445 log.error("removing bean lock and it has tx's in QUEUE! " + toString()); 446 throw new IllegalStateException ("removing bean lock and it has tx's in QUEUE!"); 447 } 448 else if (refs == 0 && getTransaction() != null) 449 { 450 log.error("removing bean lock and it has tx set! " + toString()); 451 throw new IllegalStateException ("removing bean lock and it has tx set!"); 452 } 453 else if (refs < 0) 454 { 455 log.error("negative lock reference count should never happen !"); 456 throw new IllegalStateException ("negative lock reference count !"); 457 } 458 } 459 460 public String toString() 461 { 462 StringBuffer buffer = new StringBuffer (100); 463 buffer.append(super.toString()); 464 buffer.append(", bean=").append(container.getBeanMetaData().getEjbName()); 465 buffer.append(", id=").append(id); 466 buffer.append(", refs=").append(refs); 467 buffer.append(", tx=").append(getTransaction()); 468 buffer.append(", synched=").append(synched); 469 buffer.append(", timeout=").append(txTimeout); 470 buffer.append(", queue=").append(new ArrayList (txWaitQueue)); 471 return buffer.toString(); 472 } 473 } 474 | Popular Tags |