1 8 9 package org.jboss.cache.transaction; 10 11 import junit.framework.Test; 12 import junit.framework.TestCase; 13 import junit.framework.TestSuite; 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 import org.jboss.cache.CacheException; 17 import org.jboss.cache.CacheImpl; 18 import org.jboss.cache.Fqn; 19 import org.jboss.cache.config.Configuration; 20 import org.jboss.cache.lock.IsolationLevel; 21 import org.jboss.cache.lock.TimeoutException; 22 import org.jboss.cache.lock.UpgradeException; 23 import org.jboss.cache.misc.TestingUtil; 24 25 import javax.transaction.NotSupportedException ; 26 import javax.transaction.SystemException ; 27 import javax.transaction.Transaction ; 28 29 35 public class DeadlockTest extends TestCase 36 { 37 CacheImpl cache = null; 38 Exception thread_ex; 39 40 final Fqn NODE = Fqn.fromString("/a/b/c"); 41 final Fqn PARENT_NODE = Fqn.fromString("/a/b"); 42 final Fqn FQN1 = NODE; 43 final Fqn FQN2 = Fqn.fromString("/1/2/3"); 44 final Log log = LogFactory.getLog(DeadlockTest.class); 45 46 47 public DeadlockTest(String name) 48 { 49 super(name); 50 } 51 52 public void setUp() throws Exception 53 { 54 super.setUp(); 55 DummyTransactionManager.getInstance(); 56 cache = new CacheImpl(); 57 cache.getConfiguration().setInitialStateRetrievalTimeout(10000); 58 cache.getConfiguration().setClusterName("test"); 59 cache.getConfiguration().setCacheMode(Configuration.CacheMode.LOCAL); 60 cache.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup"); 61 cache.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ); 62 cache.getConfiguration().setLockAcquisitionTimeout(3000); 63 cache.create(); 64 cache.start(); 65 thread_ex = null; 66 } 67 68 69 public void tearDown() throws Exception 70 { 71 super.tearDown(); 72 if (cache != null) 73 { 74 cache.stop(); 75 } 76 if (thread_ex != null) 77 { 78 throw thread_ex; 79 } 80 } 81 82 83 public void testConcurrentUpgrade() throws CacheException, InterruptedException 84 { 85 MyThread t1 = new MyThreadTimeout("MyThread#1", NODE); 86 MyThread t2 = new MyThread("MyThread#2", NODE); 87 88 cache.put(NODE, null); 89 90 t1.start(); 91 t2.start(); 92 93 TestingUtil.sleepThread((long) 5000); 94 95 synchronized (t1) 96 { 97 t1.notify(); } 99 100 TestingUtil.sleepThread((long) 5000); 101 102 synchronized (t2) 103 { 104 t2.notify(); } 106 107 t1.join(); 108 t2.join(); 109 } 110 111 112 116 public void testPutDeadlock() throws CacheException, InterruptedException 117 { 118 MyPutter t1 = new MyPutterTimeout("MyPutter#1", FQN1, FQN2); 119 MyPutter t2 = new MyPutter("MyPutter#2", FQN2, FQN1); 120 121 cache.put(FQN1, null); 122 cache.put(FQN2, null); 123 124 t1.start(); 125 t2.start(); 126 127 TestingUtil.sleepThread((long) 1000); 128 129 synchronized (t1) 130 { 131 t1.notify(); } 133 134 TestingUtil.sleepThread((long) 1000); 135 136 synchronized (t2) 137 { 138 t2.notify(); } 140 141 t1.join(); 142 t2.join(); 143 } 144 145 213 214 public void testMoreThanOneUpgrader() throws Exception 215 { 216 final int NUM = 2; 217 final Object lock = new Object (); 218 219 cache.put(NODE, "bla", "blo"); 220 221 MyUpgrader[] upgraders = new MyUpgrader[NUM]; 222 for (int i = 0; i < upgraders.length; i++) 223 { 224 upgraders[i] = new MyUpgrader("Upgrader#" + i, NODE, lock); 225 upgraders[i].start(); 226 } 227 228 TestingUtil.sleepThread((long) 1000); 229 log("locks: " + cache.printLockInfo()); 230 231 synchronized (lock) 232 { 233 lock.notifyAll(); 234 } 235 236 for (int i = 0; i < upgraders.length; i++) 238 { 239 MyThread upgrader = upgraders[i]; 240 upgrader.join(); 241 } 242 } 243 244 245 public void testPutsAndRemovesOnParentAndChildNodes() throws InterruptedException 246 { 247 ContinuousPutter putter = new ContinuousPutter("Putter", NODE); 248 ContinuousRemover remover = new ContinuousRemover("Remover", PARENT_NODE); 249 putter.start(); 250 remover.start(); 251 TestingUtil.sleepThread((long) 5000); 252 log("stopping Putter"); 253 putter.looping = false; 254 log("stopping Remover"); 255 remover.looping = false; 256 putter.join(); 257 remover.join(); 258 } 259 260 public void testPutsAndRemovesOnParentAndChildNodesReversed() throws InterruptedException 261 { 262 ContinuousPutter putter = new ContinuousPutter("Putter", PARENT_NODE); 263 ContinuousRemover remover = new ContinuousRemover("Remover", NODE); 264 putter.start(); 265 remover.start(); 266 TestingUtil.sleepThread((long) 5000); 267 log("stopping Putter"); 268 putter.looping = false; 269 log("stopping Remover"); 270 remover.looping = false; 271 putter.join(); 272 remover.join(); 273 } 274 275 public void testPutsAndRemovesOnSameNode() throws InterruptedException 276 { 277 ContinuousPutter putter = new ContinuousPutter("Putter", NODE); 278 ContinuousRemover remover = new ContinuousRemover("Remover", NODE); 279 putter.start(); 280 remover.start(); 281 TestingUtil.sleepThread((long) 5000); 282 log("stopping Putter"); 283 putter.looping = false; 284 log("stopping Remover"); 285 remover.looping = false; 286 putter.join(); 287 remover.join(); 288 } 289 290 291 class GenericThread extends Thread 292 { 293 protected Transaction tx; 294 protected boolean looping = true; 295 296 public GenericThread() 297 { 298 299 } 300 301 public GenericThread(String name) 302 { 303 super(name); 304 } 305 306 public void setLooping(boolean looping) 307 { 308 this.looping = looping; 309 } 310 311 public void run() 312 { 313 try 314 { 315 _run(); 316 } 317 catch (Exception t) 318 { 319 System.out.println(getName() + ": " + t); 320 if (thread_ex == null) 321 { 322 thread_ex = t; 323 } 324 } 325 if (log.isTraceEnabled()) 326 { 327 log.trace("Thread " + getName() + " terminated"); 328 } 329 } 330 331 protected void _run() throws Exception 332 { 333 throw new UnsupportedOperationException (); 334 } 335 } 336 337 338 class ContinuousRemover extends GenericThread 339 { 340 Fqn fqn; 341 342 public ContinuousRemover(String name, Fqn fqn) 343 { 344 super(name); 345 this.fqn = fqn; 346 } 347 348 349 protected void _run() throws Exception 350 { 351 while (thread_ex == null && looping) 352 { 353 try 354 { 355 if (interrupted()) 356 { 357 break; 358 } 359 tx = startTransaction(); 360 log("remove(" + fqn + ")"); 361 cache.remove(fqn); 362 sleep(random(20)); 363 tx.commit(); 364 } 365 catch (InterruptedException interrupted) 366 { 367 tx.rollback(); 368 break; 369 } 370 catch (Exception ex) 371 { 372 tx.rollback(); 373 throw ex; 374 } 375 } 376 } 377 } 378 379 class ContinuousPutter extends GenericThread 380 { 381 Fqn fqn; 382 383 public ContinuousPutter(String name, Fqn fqn) 384 { 385 super(name); 386 this.fqn = fqn; 387 } 388 389 390 protected void _run() throws Exception 391 { 392 while (thread_ex == null && looping) 393 { 394 try 395 { 396 if (interrupted()) 397 { 398 break; 399 } 400 tx = startTransaction(); 401 log("put(" + fqn + ")"); 402 cache.put(fqn, "foo", "bar"); 403 sleep(random(20)); 404 tx.commit(); 405 } 406 catch (InterruptedException interrupted) 407 { 408 tx.rollback(); 409 break; 410 } 411 catch (Exception ex) 412 { 413 tx.rollback(); 414 throw ex; 415 } 416 } 417 } 418 } 419 420 public static long random(long range) 421 { 422 return (long) ((Math.random() * 100000) % range) + 1; 423 } 424 425 426 class MyThread extends GenericThread 427 { 428 Fqn fqn; 429 430 431 public MyThread(String name, Fqn fqn) 432 { 433 super(name); 434 this.fqn = fqn; 435 } 436 437 protected void _run() throws Exception 438 { 439 tx = startTransaction(); 440 log("get(" + fqn + ")"); 441 cache.get(fqn, "bla"); log("done, locks: " + cache.printLockInfo()); 443 444 synchronized (this) {wait();} 445 446 log("put(" + fqn + ")"); 447 cache.put(fqn, "key", "val"); log("done, locks: " + cache.printLockInfo()); 449 tx.commit(); 450 log("committed TX, locks: " + cache.printLockInfo()); 451 } 452 } 453 454 455 class MyUpgrader extends MyThread 456 { 457 Object lock; 458 459 public MyUpgrader(String name, Fqn fqn) 460 { 461 super(name, fqn); 462 } 463 464 public MyUpgrader(String name, Fqn fqn, Object lock) 465 { 466 super(name, fqn); 467 this.lock = lock; 468 } 469 470 protected void _run() throws Exception 471 { 472 tx = startTransaction(); 473 try 474 { 475 log("get(" + fqn + ")"); 476 477 cache.get(fqn, "bla"); 479 synchronized (lock) {lock.wait();} 480 481 log("put(" + fqn + ")"); 482 cache.put(fqn, "key", "val"); log("done, locks: " + cache.printLockInfo()); 484 tx.commit(); 485 log("committed TX, locks: " + cache.printLockInfo()); 486 } 487 catch (UpgradeException upge) 488 { 489 log("Exception upgrading lock"); 490 tx.rollback(); 491 } 492 } 493 } 494 495 class MyThreadTimeout extends MyThread 496 { 497 498 public MyThreadTimeout(String name, Fqn fqn) 499 { 500 super(name, fqn); 501 } 502 503 protected void _run() throws Exception 504 { 505 try 506 { 507 super._run(); 508 } 509 catch (UpgradeException upgradeEx) 510 { 511 log("received UpgradeException as expected"); 512 tx.rollback(); 513 log("rolled back TX, locks: " + cache.printLockInfo()); 514 } 515 catch (TimeoutException timeoutEx) 516 { 517 log("received TimeoutException as expected"); 518 tx.rollback(); 519 log("rolled back TX, locks: " + cache.printLockInfo()); 520 } 521 } 522 } 523 524 525 class MyPutter extends GenericThread 526 { 527 Fqn fqn1, fqn2; 528 529 public MyPutter(String name, Fqn fqn1, Fqn fqn2) 530 { 531 super(name); 532 this.fqn1 = fqn1; 533 this.fqn2 = fqn2; 534 } 535 536 protected void _run() throws Exception 537 { 538 tx = startTransaction(); 539 log("put(" + fqn1 + ")"); 540 cache.put(fqn1, "key", "val"); log("done, locks: " + cache.printLockInfo()); 542 synchronized (this) {wait();} 543 log("put(" + fqn2 + ")"); 544 cache.put(fqn2, "key", "val"); log("done, locks: " + cache.printLockInfo()); 546 tx.commit(); 547 log("committed TX, locks: " + cache.printLockInfo()); 548 } 549 } 550 551 class MyPutterTimeout extends MyPutter 552 { 553 554 public MyPutterTimeout(String name, Fqn fqn1, Fqn fqn2) 555 { 556 super(name, fqn1, fqn2); 557 } 558 559 protected void _run() throws Exception 560 { 561 try 562 { 563 super._run(); 564 } 565 catch (TimeoutException timeoutEx) 566 { 567 log("received TimeoutException as expected"); 568 tx.rollback(); 569 log("rolled back TX, locks: " + cache.printLockInfo()); 570 } 571 } 572 } 573 574 575 private static void log(String msg) 576 { 577 System.out.println(Thread.currentThread().getName() + ": " + msg); 578 } 579 580 581 Transaction startTransaction() throws SystemException , NotSupportedException 582 { 583 DummyTransactionManager mgr = DummyTransactionManager.getInstance(); 584 mgr.begin(); 585 Transaction tx = mgr.getTransaction(); 586 return tx; 587 } 588 589 590 public static Test suite() throws Exception 591 { 592 return new TestSuite(DeadlockTest.class); 593 } 594 595 public static void main(String [] args) throws Exception 596 { 597 junit.textui.TestRunner.run(suite()); 598 } 599 600 601 } 602 | Popular Tags |