1 8 9 package org.jboss.cache.tests.transaction; 10 11 import junit.framework.Test; 12 import junit.framework.TestCase; 13 import junit.framework.TestSuite; 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 import org.jboss.cache.CacheException; 17 import org.jboss.cache.Fqn; 18 import org.jboss.cache.TreeCache; 19 import org.jboss.cache.lock.IsolationLevel; 20 import org.jboss.cache.lock.TimeoutException; 21 import org.jboss.cache.lock.UpgradeException; 22 import org.jboss.cache.transaction.DummyTransactionManager; 23 24 import javax.transaction.NotSupportedException ; 25 import javax.transaction.SystemException ; 26 import javax.transaction.Transaction ; 27 28 34 public class DeadlockTest extends TestCase { 35 TreeCache cache=null; 36 Exception thread_ex; 37 38 final Fqn NODE=Fqn.fromString("/a/b/c"); 39 final Fqn PARENT_NODE=Fqn.fromString("/a/b"); 40 final Fqn FQN1=NODE; 41 final Fqn FQN2=Fqn.fromString("/1/2/3"); 42 final Log log=LogFactory.getLog(DeadlockTest.class); 43 44 45 public DeadlockTest(String name) { 46 super(name); 47 } 48 49 public void setUp() throws Exception { 50 super.setUp(); 51 DummyTransactionManager.getInstance(); 52 cache=new TreeCache("test", null, 10000); 53 cache.setCacheMode(TreeCache.LOCAL); 54 cache.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 55 cache.setIsolationLevel(IsolationLevel.REPEATABLE_READ); 56 cache.setLockAcquisitionTimeout(3000); 57 cache.createService(); 58 cache.startService(); 59 thread_ex=null; 60 } 61 62 63 public void tearDown() throws Exception { 64 super.tearDown(); 65 if(cache != null) 66 cache.stopService(); 67 if(thread_ex != null) 68 throw thread_ex; 69 } 70 71 72 public void testConcurrentUpgrade() throws CacheException, InterruptedException { 73 MyThread t1=new MyThreadTimeout("MyThread#1", NODE); 74 MyThread t2=new MyThread("MyThread#2", NODE); 75 76 cache.put(NODE, null); 77 78 t1.start(); 79 t2.start(); 80 81 sleep(1000); 82 83 synchronized(t1) { 84 t1.notify(); } 86 87 sleep(1000); 88 89 synchronized(t2) { 90 t2.notify(); } 92 93 t1.join(); 94 t2.join(); 95 } 96 97 98 102 public void testPutDeadlock() throws CacheException, InterruptedException { 103 MyPutter t1=new MyPutterTimeout("MyPutter#1", FQN1, FQN2); 104 MyPutter t2=new MyPutter("MyPutter#2", FQN2, FQN1); 105 106 cache.put(FQN1, null); 107 cache.put(FQN2, null); 108 109 t1.start(); 110 t2.start(); 111 112 sleep(1000); 113 114 synchronized(t1) { 115 t1.notify(); } 117 118 sleep(1000); 119 120 synchronized(t2) { 121 t2.notify(); } 123 124 t1.join(); 125 t2.join(); 126 } 127 128 129 public void testCreateIfNotExistsLogic() throws CacheException, InterruptedException { 130 cache.put(NODE, null); 131 132 class T0 extends GenericThread { 133 public T0(String name) { 134 super(name); 135 } 136 137 protected void _run() throws Exception { 138 Transaction myTx=startTransaction(); 139 log("put(" + NODE + ")"); 140 cache.put(NODE, null); 141 log("put(" + NODE + "): OK"); 142 143 synchronized(this) {wait();} 144 145 log("remove(" + NODE + ")"); 146 cache.remove(NODE); 147 log("remove(" + NODE + "): OK"); 148 149 log("committing TX"); 150 myTx.commit(); 151 } 152 } 153 154 class T1 extends GenericThread { 155 public T1(String name) { 156 super(name); 157 } 158 159 protected void _run() throws Exception { 160 Transaction myTx=startTransaction(); 161 log("put(" + NODE + ")"); 162 cache.put(NODE, null); 163 log("put(" + NODE + "): OK"); 164 165 log("committing TX"); 166 myTx.commit(); 167 } 168 169 } 170 171 T0 t0=new T0("T0"); 172 t0.start(); 173 sleep(500); 174 T1 t1=new T1("T1"); 175 t1.start(); 176 sleep(500); 177 synchronized(t0) { 178 t0.notify(); 179 } 180 t0.join(); 181 t1.join(); 182 } 183 184 185 public void testMoreThanOneUpgrader() throws Exception { 186 final int NUM=2; 187 final Object lock=new Object (); 188 189 cache.put(NODE, "bla", "blo"); 190 191 MyUpgrader[] upgraders=new MyUpgrader[NUM]; 192 for(int i=0; i < upgraders.length; i++) { 193 upgraders[i]=new MyUpgrader("Upgrader#" + i, NODE, lock); 194 upgraders[i].start(); 195 } 196 197 sleep(1000); log("locks: " + cache.printLockInfo()); 199 200 201 synchronized(lock) { 202 lock.notifyAll(); 203 } 204 205 for(int i=0; i < upgraders.length; i++) { 207 MyThread upgrader=upgraders[i]; 208 upgrader.join(); 209 } 210 } 211 212 213 public void testPutsAndRemovesOnParentAndChildNodes() throws InterruptedException { 214 ContinuousPutter putter=new ContinuousPutter("Putter", NODE); 215 ContinuousRemover remover=new ContinuousRemover("Remover", PARENT_NODE); 216 putter.start(); 217 remover.start(); 218 sleep(5000); 219 log("stopping Putter"); 220 putter.looping=false; 221 log("stopping Remover"); 222 remover.looping=false; 223 putter.join(); 224 remover.join(); 225 } 226 227 public void testPutsAndRemovesOnParentAndChildNodesReversed() throws InterruptedException { 228 ContinuousPutter putter=new ContinuousPutter("Putter", PARENT_NODE); 229 ContinuousRemover remover=new ContinuousRemover("Remover", NODE); 230 putter.start(); 231 remover.start(); 232 sleep(5000); 233 log("stopping Putter"); 234 putter.looping=false; 235 log("stopping Remover"); 236 remover.looping=false; 237 putter.join(); 238 remover.join(); 239 } 240 241 public void testPutsAndRemovesOnSameNode() throws InterruptedException { 242 ContinuousPutter putter=new ContinuousPutter("Putter", NODE); 243 ContinuousRemover remover=new ContinuousRemover("Remover", NODE); 244 putter.start(); 245 remover.start(); 246 sleep(5000); 247 log("stopping Putter"); 248 putter.looping=false; 249 log("stopping Remover"); 250 remover.looping=false; 251 putter.join(); 252 remover.join(); 253 } 254 255 256 class GenericThread extends Thread { 257 protected Transaction tx; 258 protected boolean looping=true; 259 260 public GenericThread() { 261 262 } 263 264 public GenericThread(String name) { 265 super(name); 266 } 267 268 public void setLooping(boolean looping) { 269 this.looping=looping; 270 } 271 272 public void run() { 273 try { 274 _run(); 275 } 276 catch(Exception t) { 277 System.out.println(getName() + ": " + t); 278 if(thread_ex == null) 279 thread_ex=t; 280 } 281 if(log.isTraceEnabled()) 282 log.trace("Thread " + getName() + " terminated"); 283 } 284 285 protected void _run() throws Exception { 286 throw new UnsupportedOperationException (); 287 } 288 } 289 290 291 class ContinuousRemover extends GenericThread { 292 Fqn fqn; 293 294 public ContinuousRemover(String name, Fqn fqn) { 295 super(name); 296 this.fqn=fqn; 297 } 298 299 300 protected void _run() throws Exception { 301 while(thread_ex == null && looping) { 302 try { 303 if(interrupted()) 304 break; 305 tx=startTransaction(); 306 log("remove(" + fqn + ")"); 307 cache.remove(fqn); 308 sleep(random(20)); 309 tx.commit(); 310 } 311 catch(InterruptedException interrupted) { 312 tx.rollback(); 313 break; 314 } 315 catch(Exception ex) { 316 tx.rollback(); 317 throw ex; 318 } 319 } 320 } 321 } 322 323 class ContinuousPutter extends GenericThread { 324 Fqn fqn; 325 326 public ContinuousPutter(String name, Fqn fqn) { 327 super(name); 328 this.fqn=fqn; 329 } 330 331 332 protected void _run() throws Exception { 333 while(thread_ex == null && looping) { 334 try { 335 if(interrupted()) 336 break; 337 tx=startTransaction(); 338 log("put(" + fqn + ")"); 339 cache.put(fqn, "foo", "bar"); 340 sleep(random(20)); 341 tx.commit(); 342 } 343 catch(InterruptedException interrupted) { 344 tx.rollback(); 345 break; 346 } 347 catch(Exception ex) { 348 tx.rollback(); 349 throw ex; 350 } 351 } 352 } 353 } 354 355 public static long random(long range) { 356 return (long)((Math.random() * 100000) % range) + 1; 357 } 358 359 360 361 class MyThread extends GenericThread { 362 Fqn fqn; 363 364 365 public MyThread(String name, Fqn fqn) { 366 super(name); 367 this.fqn=fqn; 368 } 369 370 protected void _run() throws Exception { 371 tx=startTransaction(); 372 log("get(" + fqn + ")"); 373 cache.get(fqn, "bla"); log("done, locks: " + cache.printLockInfo()); 375 376 synchronized(this) {wait();} 377 378 log("put(" + fqn + ")"); 379 cache.put(fqn, "key", "val"); log("done, locks: " + cache.printLockInfo()); 381 tx.commit(); 382 log("committed TX, locks: " + cache.printLockInfo()); 383 } 384 } 385 386 387 class MyUpgrader extends MyThread { 388 Object lock; 389 390 public MyUpgrader(String name, Fqn fqn) { 391 super(name, fqn); 392 } 393 394 public MyUpgrader(String name, Fqn fqn, Object lock) { 395 super(name, fqn); 396 this.lock=lock; 397 } 398 399 protected void _run() throws Exception { 400 tx=startTransaction(); 401 log("get(" + fqn + ")"); 402 cache.get(fqn, "bla"); 404 synchronized(lock) {lock.wait();} 405 406 log("put(" + fqn + ")"); 407 cache.put(fqn, "key", "val"); log("done, locks: " + cache.printLockInfo()); 409 tx.commit(); 410 log("committed TX, locks: " + cache.printLockInfo()); 411 } 412 413 } 414 415 class MyThreadTimeout extends MyThread { 416 417 public MyThreadTimeout(String name, Fqn fqn) { 418 super(name, fqn); 419 } 420 421 protected void _run() throws Exception { 422 try { 423 super._run(); 424 } 425 catch(UpgradeException upgradeEx) { 426 log("received UpgradeException as expected"); 427 tx.rollback(); 428 log("rolled back TX, locks: " + cache.printLockInfo()); 429 } 430 catch(TimeoutException timeoutEx) { 431 log("received TimeoutException as expected"); 432 tx.rollback(); 433 log("rolled back TX, locks: " + cache.printLockInfo()); 434 } 435 } 436 } 437 438 439 440 class MyPutter extends GenericThread { 441 Fqn fqn1, fqn2; 442 443 public MyPutter(String name, Fqn fqn1, Fqn fqn2) { 444 super(name); 445 this.fqn1=fqn1; 446 this.fqn2=fqn2; 447 } 448 449 protected void _run() throws Exception { 450 tx=startTransaction(); 451 log("put(" + fqn1 + ")"); 452 cache.put(fqn1, "key", "val"); log("done, locks: " + cache.printLockInfo()); 454 synchronized(this) {wait();} 455 log("put(" + fqn2 + ")"); 456 cache.put(fqn2, "key", "val"); log("done, locks: " + cache.printLockInfo()); 458 tx.commit(); 459 log("committed TX, locks: " + cache.printLockInfo()); 460 } 461 } 462 463 class MyPutterTimeout extends MyPutter { 464 465 public MyPutterTimeout(String name, Fqn fqn1, Fqn fqn2) { 466 super(name, fqn1, fqn2); 467 } 468 469 protected void _run() throws Exception { 470 try { 471 super._run(); 472 } 473 catch(TimeoutException timeoutEx) { 474 log("received TimeoutException as expected"); 475 tx.rollback(); 476 log("rolled back TX, locks: " + cache.printLockInfo()); 477 } 478 } 479 } 480 481 482 483 private static void log(String msg) { 484 System.out.println(Thread.currentThread().getName() + ": " + msg); 485 } 486 487 private static void sleep(long timeout) { 488 try { 489 Thread.sleep(timeout); 490 } 491 catch(InterruptedException e) { 492 } 493 } 494 495 496 497 Transaction startTransaction() throws SystemException , NotSupportedException { 498 DummyTransactionManager mgr=DummyTransactionManager.getInstance(); 499 mgr.begin(); 500 Transaction tx=mgr.getTransaction(); 501 return tx; 502 } 503 504 505 506 public static Test suite() throws Exception { 507 return new TestSuite(DeadlockTest.class); 508 } 509 510 public static void main(String [] args) throws Exception { 511 junit.textui.TestRunner.run(suite()); 512 } 513 514 515 } 516 | Popular Tags |