1 4 package com.tc.objectserver.lockmanager.impl; 5 6 import org.apache.commons.io.output.NullOutputStream; 7 8 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 9 10 import com.tc.exception.ImplementMe; 11 import com.tc.net.TCSocketAddress; 12 import com.tc.net.protocol.NetworkStackID; 13 import com.tc.net.protocol.TCNetworkMessage; 14 import com.tc.net.protocol.tcm.ChannelEventListener; 15 import com.tc.net.protocol.tcm.ChannelID; 16 import com.tc.net.protocol.tcm.MessageChannel; 17 import com.tc.net.protocol.tcm.TCMessage; 18 import com.tc.net.protocol.tcm.TCMessageType; 19 import com.tc.object.lockmanager.api.LockID; 20 import com.tc.object.lockmanager.api.LockLevel; 21 import com.tc.object.lockmanager.api.ServerThreadID; 22 import com.tc.object.lockmanager.api.ThreadID; 23 import com.tc.object.tx.WaitInvocation; 24 import com.tc.objectserver.api.TestSink; 25 import com.tc.objectserver.context.LockResponseContext; 26 import com.tc.objectserver.lockmanager.api.DeadlockChain; 27 import com.tc.objectserver.lockmanager.api.DeadlockResults; 28 import com.tc.objectserver.lockmanager.api.LockHolder; 29 import com.tc.objectserver.lockmanager.api.LockMBean; 30 import com.tc.objectserver.lockmanager.api.NullChannelManager; 31 import com.tc.objectserver.lockmanager.api.ServerLockRequest; 32 import com.tc.objectserver.lockmanager.api.Waiter; 33 import com.tc.util.concurrent.ThreadUtil; 34 35 import java.io.IOException ; 36 import java.io.ObjectOutputStream ; 37 import java.net.UnknownHostException ; 38 import java.util.ArrayList ; 39 import java.util.Arrays ; 40 import java.util.Comparator ; 41 import java.util.HashMap ; 42 import java.util.Iterator ; 43 import java.util.List ; 44 import java.util.Map ; 45 import java.util.Random ; 46 47 import junit.framework.TestCase; 48 49 52 public class LockManagerTest extends TestCase { 53 private TestSink sink; 54 private LockManagerImpl lockManager; 55 private Random random = new Random (); 56 57 final int numLocks = 30; 58 final int numThreads = 15; 59 private LockID[] locks = makeUniqueLocks(numLocks); 60 private ServerThreadID[] txns = makeUniqueTxns(numThreads); 61 62 protected void setUp() throws Exception { 63 super.setUp(); 64 resetLockManager(); 65 sink = new TestSink(); 66 } 67 68 private void resetLockManager() { 69 resetLockManager(false); 70 } 71 72 private void resetLockManager(boolean start) { 73 if (lockManager != null) { 74 try { 75 lockManager.stop(); 76 } catch (InterruptedException e) { 77 fail(); 78 } 79 } 80 81 lockManager = new LockManagerImpl(new NullChannelManager()); 82 lockManager.setLockPolicy(LockManagerImpl.ALTRUISTIC_LOCK_POLICY); 83 if (start) { 84 lockManager.start(); 85 } 86 } 87 88 protected void tearDown() throws Exception { 89 assertEquals(0, lockManager.getLockCount()); 90 assertEquals(0, lockManager.getThreadContextCount()); 91 super.tearDown(); 92 } 93 94 public void testLockMBean() throws IOException { 95 96 final MessageChannel channel = new TestMessageChannel(); 97 98 final long start = System.currentTimeMillis(); 99 final ChannelID cid1 = new ChannelID(1); 100 ChannelID cid2 = new ChannelID(2); 101 LockID lid1 = new LockID("1"); 102 LockID lid2 = new LockID("2"); 103 LockID lid3 = new LockID("3"); 104 ThreadID tid1 = new ThreadID(1); 105 WaitInvocation wait = new WaitInvocation(Integer.MAX_VALUE); 106 107 lockManager = new LockManagerImpl(new NullChannelManager() { 108 public MessageChannel getChannel(ChannelID id) { 109 if (cid1.equals(id)) { return channel; } 110 return null; 111 } 112 113 public String getChannelAddress(ChannelID channelID) { 114 if (cid1.equals(channelID)) { return "127.0.0.1:6969"; } 115 return "no longer connected"; 116 } 117 }); 118 119 lockManager.setLockPolicy(LockManagerImpl.ALTRUISTIC_LOCK_POLICY); 120 lockManager.start(); 121 122 lockManager.requestLock(lid1, cid1, tid1, LockLevel.WRITE, sink); lockManager.requestLock(lid1, cid2, tid1, LockLevel.WRITE, sink); 125 lockManager.requestLock(lid2, cid1, tid1, LockLevel.READ, sink); lockManager.requestLock(lid2, cid2, tid1, LockLevel.READ, sink); lockManager.requestLock(lid2, cid1, tid1, LockLevel.WRITE, sink); 129 lockManager.requestLock(lid3, cid1, tid1, LockLevel.WRITE, sink); lockManager.wait(lid3, cid1, tid1, wait, sink); 132 LockMBean[] lockBeans = lockManager.getAllLocks(); 133 assertEquals(3, lockBeans.length); 134 sortLocksByID(lockBeans); 135 136 LockMBean bean1 = lockBeans[0]; 137 LockMBean bean2 = lockBeans[1]; 138 LockMBean bean3 = lockBeans[2]; 139 testSerialize(bean1); 140 testSerialize(bean2); 141 testSerialize(bean3); 142 143 validateBean1(bean1, start); 144 validateBean2(bean2, start); 145 validateBean3(bean3, start, wait); 146 147 lockManager.clearAllLocksFor(cid1); 148 lockManager.clearAllLocksFor(cid2); 149 } 150 151 private void validateBean3(LockMBean bean3, long time, WaitInvocation wait) { 152 LockHolder[] holders = bean3.getHolders(); 153 ServerLockRequest[] reqs = bean3.getPendingRequests(); 154 ServerLockRequest[] upgrades = bean3.getPendingUpgrades(); 155 Waiter[] waiters = bean3.getWaiters(); 156 assertEquals(0, holders.length); 157 assertEquals(0, reqs.length); 158 assertEquals(0, upgrades.length); 159 assertEquals(1, waiters.length); 160 161 Waiter waiter = waiters[0]; 162 assertEquals(wait.toString(), waiter.getWaitInvocation()); 163 assertTrue(waiter.getStartTime() >= time); 164 assertEquals(new ChannelID(1), waiter.getChannelID()); 165 assertEquals("127.0.0.1:6969", waiter.getChannelAddr()); 166 assertEquals(new ThreadID(1), waiter.getThreadID()); 167 } 168 169 private void validateBean2(LockMBean bean2, long time) { 170 LockHolder[] holders = bean2.getHolders(); 171 ServerLockRequest[] reqs = bean2.getPendingRequests(); 172 ServerLockRequest[] upgrades = bean2.getPendingUpgrades(); 173 Waiter[] waiters = bean2.getWaiters(); 174 assertEquals(2, holders.length); 175 assertEquals(0, reqs.length); 176 assertEquals(1, upgrades.length); 177 assertEquals(0, waiters.length); 178 179 LockHolder holder = holders[0]; 180 assertEquals(LockLevel.toString(LockLevel.READ), holder.getLockLevel()); 181 assertTrue(holder.getTimeAcquired() >= time); 182 assertEquals(new ChannelID(1), holder.getChannelID()); 183 assertEquals("127.0.0.1:6969", holder.getChannelAddr()); 184 assertEquals(new ThreadID(1), holder.getThreadID()); 185 186 holder = holders[1]; 187 assertEquals(LockLevel.toString(LockLevel.READ), holder.getLockLevel()); 188 assertTrue(holder.getTimeAcquired() >= time); 189 assertEquals(new ChannelID(2), holder.getChannelID()); 190 assertEquals("no longer connected", holder.getChannelAddr()); 191 assertEquals(new ThreadID(1), holder.getThreadID()); 192 193 ServerLockRequest up = upgrades[0]; 194 assertEquals(LockLevel.toString(LockLevel.WRITE), up.getLockLevel()); 195 assertTrue(up.getRequestTime() >= time); 196 assertEquals(new ChannelID(1), up.getChannelID()); 197 assertEquals("127.0.0.1:6969", up.getChannelAddr()); 198 assertEquals(new ThreadID(1), up.getThreadID()); 199 } 200 201 private void validateBean1(LockMBean bean1, long time) { 202 LockHolder[] holders = bean1.getHolders(); 203 ServerLockRequest[] reqs = bean1.getPendingRequests(); 204 ServerLockRequest[] upgrades = bean1.getPendingUpgrades(); 205 Waiter[] waiters = bean1.getWaiters(); 206 assertEquals(1, holders.length); 207 assertEquals(1, reqs.length); 208 assertEquals(0, upgrades.length); 209 assertEquals(0, waiters.length); 210 211 LockHolder holder = holders[0]; 212 assertEquals(LockLevel.toString(LockLevel.WRITE), holder.getLockLevel()); 213 assertTrue(holder.getTimeAcquired() >= time); 214 assertEquals(new ChannelID(1), holder.getChannelID()); 215 assertEquals(new ThreadID(1), holder.getThreadID()); 216 assertEquals("127.0.0.1:6969", holder.getChannelAddr()); 217 218 ServerLockRequest req = reqs[0]; 219 assertEquals(LockLevel.toString(LockLevel.WRITE), req.getLockLevel()); 220 assertTrue(req.getRequestTime() >= time); 221 assertEquals(new ChannelID(2), req.getChannelID()); 222 assertEquals("no longer connected", req.getChannelAddr()); 223 assertEquals(new ThreadID(1), req.getThreadID()); 224 } 225 226 private void testSerialize(Object o) throws IOException { 227 ObjectOutputStream oos = new ObjectOutputStream (new NullOutputStream()); 228 oos.writeObject(o); 229 oos.close(); 230 } 231 232 private void sortLocksByID(LockMBean[] lockBeans) { 233 Arrays.sort(lockBeans, new Comparator () { 234 public int compare(Object o1, Object o2) { 235 LockMBean l1 = (LockMBean) o1; 236 LockMBean l2 = (LockMBean) o2; 237 238 String id1 = l1.getLockName(); 239 String id2 = l2.getLockName(); 240 241 return id1.compareTo(id2); 242 } 243 }); 244 } 245 246 public void testReestablishWait() throws Exception { 247 LockID lockID1 = new LockID("my lock"); 248 ChannelID channel1 = new ChannelID(1); 249 ThreadID tx1 = new ThreadID(1); 250 ThreadID tx2 = new ThreadID(2); 251 252 try { 253 assertEquals(0, lockManager.getLockCount()); 254 long waitTime = 1000; 255 WaitInvocation waitCall = new WaitInvocation(waitTime); 256 TestSink responseSink = new TestSink(); 257 long t0 = System.currentTimeMillis(); 258 lockManager.reestablishWait(lockID1, channel1, tx1, LockLevel.WRITE, waitCall, responseSink); 259 lockManager.reestablishWait(lockID1, channel1, tx2, LockLevel.WRITE, waitCall, responseSink); 260 lockManager.start(); 261 262 LockResponseContext ctxt = (LockResponseContext) responseSink.waitForAdd(waitTime * 3); 263 assertTrue(ctxt != null); 264 assertTrue(System.currentTimeMillis() - t0 >= waitTime); 265 assertResponseContext(lockID1, channel1, tx1, LockLevel.WRITE, ctxt); 266 assertTrue(ctxt.isLockWaitTimeout()); 267 ThreadUtil.reallySleep(waitTime * 3); 268 assertEquals(3, responseSink.size()); ctxt = (LockResponseContext) responseSink.take(); 270 LockResponseContext ctxt1 = (LockResponseContext) responseSink.take(); 271 LockResponseContext ctxt2 = (LockResponseContext) responseSink.take(); 272 assertTrue((ctxt1.isLockAward() && ctxt2.isLockWaitTimeout()) 273 || (ctxt2.isLockAward() && ctxt1.isLockWaitTimeout())); 274 275 } finally { 276 lockManager = null; 277 resetLockManager(); 278 } 279 } 280 281 public void testReestablishLockAfterReestablishWait() throws Exception { 282 LockID lockID1 = new LockID("my lock"); 283 ChannelID channel1 = new ChannelID(1); 284 ThreadID tx1 = new ThreadID(1); 285 ThreadID tx2 = new ThreadID(2); 286 int requestedLevel = LockLevel.WRITE; 287 WaitInvocation waitCall = new WaitInvocation(); 288 try { 289 TestSink responseSink = new TestSink(); 290 assertEquals(0, lockManager.getLockCount()); 291 lockManager.reestablishWait(lockID1, channel1, tx1, LockLevel.WRITE, waitCall, responseSink); 292 assertEquals(1, lockManager.getLockCount()); 293 assertEquals(0, responseSink.getInternalQueue().size()); 294 295 try { 297 lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink); 298 fail("Should have thrown an AssertionError."); 299 } catch (AssertionError e) { 300 } 302 assertEquals(1, lockManager.getLockCount()); 305 lockManager.reestablishLock(lockID1, channel1, tx2, requestedLevel, responseSink); 306 } finally { 307 lockManager = null; 308 resetLockManager(); 309 } 310 } 311 312 public void testReestablishReadLock() throws Exception { 313 LockID lockID1 = new LockID("my lock"); 314 ChannelID channel1 = new ChannelID(1); 315 ThreadID tx1 = new ThreadID(1); 316 ThreadID tx2 = new ThreadID(2); 317 ThreadID tx3 = new ThreadID(3); 318 int requestedLevel = LockLevel.READ; 319 320 try { 321 TestSink responseSink = new TestSink(); 322 assertEquals(0, lockManager.getLockCount()); 323 324 lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink); 325 assertEquals(1, lockManager.getLockCount()); 326 327 lockManager.reestablishLock(lockID1, channel1, tx2, requestedLevel, responseSink); 330 assertEquals(1, lockManager.getLockCount()); 331 332 try { 334 lockManager.reestablishLock(lockID1, channel1, tx3, LockLevel.WRITE, responseSink); 335 fail("Should have thrown a LockManagerError."); 336 } catch (AssertionError e) { 337 } 339 340 } finally { 341 lockManager = null; 343 resetLockManager(); 344 } 345 346 try { 347 TestSink responseSink = new TestSink(); 348 assertEquals(0, lockManager.getLockCount()); 349 lockManager.reestablishLock(lockID1, channel1, tx1, LockLevel.WRITE, responseSink); 350 assertEquals(1, lockManager.getLockCount()); 351 352 responseSink = new TestSink(); 354 try { 355 lockManager.reestablishLock(lockID1, channel1, tx2, LockLevel.READ, responseSink); 356 fail("Should have thrown a LockManagerError"); 357 } catch (AssertionError e) { 358 } 360 361 } finally { 362 lockManager = null; 363 resetLockManager(); 364 } 365 } 366 367 public void testReestablishWriteLock() throws Exception { 368 369 LockID lockID1 = new LockID("my lock"); 370 LockID lockID2 = new LockID("my other lock"); 371 ChannelID channel1 = new ChannelID(1); 372 ChannelID channel2 = new ChannelID(2); 373 ThreadID tx1 = new ThreadID(1); 374 ThreadID tx2 = new ThreadID(2); 375 int requestedLevel = LockLevel.WRITE; 376 377 try { 378 TestSink responseSink = new TestSink(); 379 assertEquals(0, lockManager.getLockCount()); 380 lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink); 381 assertEquals(1, lockManager.getLockCount()); 382 383 try { 384 lockManager.reestablishLock(lockID1, channel2, tx2, requestedLevel, responseSink); 385 fail("Expected a LockManagerError!"); 386 } catch (AssertionError e) { 387 } 389 390 lockManager.reestablishLock(lockID2, channel1, tx1, requestedLevel, responseSink); 392 393 lockManager.start(); 394 try { 397 lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, null); 398 fail("Should have thrown a LockManagerError"); 399 } catch (Error e) { 400 } 402 403 } finally { 404 lockManager = null; 406 resetLockManager(); 407 } 408 } 409 410 417 private void assertResponseContext(LockID lockID, ChannelID channel, ThreadID tx1, int requestedLevel, 418 LockResponseContext ctxt) { 419 assertEquals(lockID, ctxt.getLockID()); 420 assertEquals(channel, ctxt.getChannelID()); 421 assertEquals(tx1, ctxt.getThreadID()); 422 assertEquals(requestedLevel, ctxt.getLockLevel()); 423 } 424 425 public void testWaitTimeoutsIgnoredDuringStartup() throws Exception { 426 LockID lockID = new LockID("my lcok"); 427 ChannelID channel1 = new ChannelID(1); 428 ThreadID tx1 = new ThreadID(1); 429 try { 430 long waitTime = 1000; 431 WaitInvocation waitInvocation = new WaitInvocation(waitTime); 432 TestSink responseSink = new TestSink(); 433 lockManager.reestablishWait(lockID, channel1, tx1, LockLevel.WRITE, waitInvocation, responseSink); 434 435 LockResponseContext ctxt = (LockResponseContext) responseSink.waitForAdd(waitTime * 2); 436 assertNull(ctxt); 437 438 lockManager.start(); 439 ctxt = (LockResponseContext) responseSink.waitForAdd(0); 440 assertNotNull(ctxt); 441 } finally { 442 lockManager = null; 443 resetLockManager(); 444 } 445 } 446 447 public void testWaitTimeoutsIgnoredDuringShutdown() throws InterruptedException { 448 449 ChannelID channelID = new ChannelID(1); 450 LockID lockID = new LockID("1"); 451 ThreadID txID = new ThreadID(1); 452 453 lockManager.start(); 454 boolean granted = lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 455 assertTrue(granted); 456 457 lockManager.wait(lockID, channelID, txID, new WaitInvocation(1000), sink); 458 lockManager.stop(); 459 460 assertFalse(lockManager.hasPending(lockID)); 461 assertEquals(0, lockManager.getLockCount()); 462 463 ThreadUtil.reallySleep(1500); 464 465 assertFalse(lockManager.hasPending(lockID)); 466 assertEquals(0, lockManager.getLockCount()); 467 } 468 469 public void testOffBlocksUntilNoOutstandingLocksViaWait() throws Exception { 470 if (true) return; 472 List queue = sink.getInternalQueue(); 473 ChannelID channelID = new ChannelID(1); 474 LockID lockID = new LockID("1"); 475 ThreadID txID = new ThreadID(1); 476 477 final LinkedQueue shutdownSteps = new LinkedQueue(); 478 ShutdownThread shutdown = new ShutdownThread(shutdownSteps); 479 480 try { 481 lockManager.start(); 482 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 483 assertEquals(1, queue.size()); 484 485 shutdown.start(); 486 shutdownSteps.take(); 487 ThreadUtil.reallySleep(500); 488 assertTrue(shutdownSteps.peek() == null); 490 491 lockManager.wait(lockID, channelID, new ThreadID(1), new WaitInvocation(), sink); 495 shutdownSteps.take(); 496 497 } finally { 498 lockManager.clearAllLocksFor(channelID); 499 } 500 } 501 502 public void testOffDoesNotBlockUntilNoOutstandingLocksViaUnlock() throws Exception { 503 List queue = sink.getInternalQueue(); 504 ChannelID channel1 = new ChannelID(1); 505 LockID lock1 = new LockID("1"); 506 ThreadID tx1 = new ThreadID(1); 507 508 final LinkedQueue shutdownSteps = new LinkedQueue(); 509 ShutdownThread shutdown = new ShutdownThread(shutdownSteps); 510 try { 511 lockManager.start(); 512 lockManager.requestLock(lock1, channel1, tx1, LockLevel.WRITE, sink); 513 assertEquals(1, queue.size()); 514 515 shutdown.start(); 516 shutdownSteps.take(); 517 ThreadUtil.reallySleep(1000); 518 shutdownSteps.take(); 519 } finally { 520 lockManager = null; 521 resetLockManager(); 522 } 523 } 524 525 public void testOffCancelsWaits() throws Exception { 526 } 528 529 public void testOffStopsGrantingNewLocks() throws Exception { 530 List queue = sink.getInternalQueue(); 531 ChannelID channelID = new ChannelID(1); 532 LockID lockID = new LockID("1"); 533 ThreadID txID = new ThreadID(1); 534 try { 535 lockManager.start(); 537 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 538 assertEquals(1, queue.size()); 539 queue.clear(); 540 lockManager.unlock(lockID, channelID, txID); 541 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 542 assertEquals(1, queue.size()); 543 lockManager.unlock(lockID, channelID, txID); 544 545 queue.clear(); 548 lockManager.stop(); 549 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 550 assertEquals(0, queue.size()); 551 } finally { 552 lockManager.clearAllLocksFor(channelID); 553 } 554 } 555 556 public void testRequestDoesntGrantPendingLocks() throws Exception { 557 List queue = sink.getInternalQueue(); 558 ChannelID channelID = new ChannelID(1); 559 LockID lockID = new LockID("1"); 560 ThreadID txID = new ThreadID(1); 561 562 try { 563 lockManager.start(); 564 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 567 queue.clear(); 568 lockManager.requestLock(lockID, channelID, new ThreadID(2), LockLevel.WRITE, sink); 569 assertEquals(0, queue.size()); 571 } finally { 572 lockManager = null; 573 resetLockManager(); 574 } 575 } 576 577 public void testUnlockIgnoredDuringShutdown() throws Exception { 578 List queue = sink.getInternalQueue(); 579 ChannelID channelID = new ChannelID(1); 580 LockID lockID = new LockID("1"); 581 ThreadID txID = new ThreadID(1); 582 try { 583 lockManager.start(); 584 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 587 queue.clear(); 588 lockManager.requestLock(lockID, channelID, new ThreadID(2), LockLevel.WRITE, sink); 589 assertEquals(0, queue.size()); 591 592 lockManager.stop(); 593 594 lockManager.unlock(lockID, channelID, txID); 596 assertEquals(0, queue.size()); 598 599 } finally { 600 lockManager = null; 601 resetLockManager(); 602 } 603 } 604 605 public void testLockManagerBasics() { 606 LockID l1 = new LockID("1"); 607 ChannelID c1 = new ChannelID(1); 608 ThreadID s1 = new ThreadID(0); 609 610 ChannelID c2 = new ChannelID(2); 611 ChannelID c3 = new ChannelID(3); 612 ChannelID c4 = new ChannelID(4); 613 lockManager.start(); 614 lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink); 615 assertTrue(sink.size() == 1); 616 System.out.println(sink.getInternalQueue().remove(0)); 617 618 lockManager.requestLock(l1, c2, s1, LockLevel.WRITE, sink); 619 assertTrue(sink.size() == 0); 620 lockManager.unlock(l1, c1, s1); 621 assertTrue(sink.size() == 1); 622 System.out.println(sink.getInternalQueue().remove(0)); 623 624 lockManager.requestLock(l1, c3, s1, LockLevel.READ, sink); 625 assertTrue(sink.size() == 0); 626 assertTrue(lockManager.hasPending(l1)); 627 lockManager.unlock(l1, c2, s1); 628 assertTrue(sink.size() == 1); 629 assertFalse(lockManager.hasPending(l1)); 630 631 lockManager.requestLock(l1, c4, s1, LockLevel.WRITE, sink); 632 assertTrue(lockManager.hasPending(l1)); 633 lockManager.unlock(l1, c2, s1); 634 assertTrue(lockManager.hasPending(l1)); 635 lockManager.unlock(l1, c3, s1); 636 assertFalse(lockManager.hasPending(l1)); 637 lockManager.unlock(l1, c4, s1); 638 } 639 640 public void testDeadLock1() { 641 644 LockID l1 = new LockID("1"); 645 LockID l2 = new LockID("2"); 646 ChannelID c1 = new ChannelID(1); 647 648 ThreadID s1 = new ThreadID(1); 649 ThreadID s2 = new ThreadID(2); 650 651 ServerThreadID thread1 = new ServerThreadID(c1, s1); 652 ServerThreadID thread2 = new ServerThreadID(c1, s2); 653 654 lockManager.start(); 655 lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink); 657 lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink); 659 lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink); 661 lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink); 663 664 TestDeadlockResults deadlocks = new TestDeadlockResults(); 665 lockManager.scanForDeadlocks(deadlocks); 666 667 assertEquals(1, deadlocks.chains.size()); 668 Map check = new HashMap (); 669 check.put(thread1, l2); 670 check.put(thread2, l1); 671 assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check); 672 673 DeadlockChain[] results = lockManager.scanForDeadlocks(); 675 assertEquals(1, results.length); 676 check = new HashMap (); 677 check.put(thread1, l2); 678 check.put(thread2, l1); 679 assertSpecificDeadlock(results[0], check); 680 681 lockManager.clearAllLocksFor(c1); 682 } 683 684 public void testDeadLock3() { 685 687 LockID l1 = new LockID("1"); 689 LockID l2 = new LockID("2"); 690 691 LockID l3 = new LockID("3"); 693 LockID l4 = new LockID("4"); 694 LockID l5 = new LockID("5"); 695 696 ChannelID c1 = new ChannelID(1); 697 ThreadID s1 = new ThreadID(1); 698 ThreadID s2 = new ThreadID(2); 699 700 ServerThreadID thread1 = new ServerThreadID(c1, s1); 701 ServerThreadID thread2 = new ServerThreadID(c1, s2); 702 703 lockManager.start(); 704 705 lockManager.requestLock(l3, c1, s1, LockLevel.READ, sink); 707 lockManager.requestLock(l4, c1, s1, LockLevel.READ, sink); 708 lockManager.requestLock(l5, c1, s1, LockLevel.READ, sink); 709 lockManager.requestLock(l3, c1, s2, LockLevel.READ, sink); 710 lockManager.requestLock(l4, c1, s2, LockLevel.READ, sink); 711 712 lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink); 714 lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink); 716 lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink); 718 lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink); 720 721 TestDeadlockResults deadlocks = new TestDeadlockResults(); 722 lockManager.scanForDeadlocks(deadlocks); 723 724 assertEquals(1, deadlocks.chains.size()); 725 Map check = new HashMap (); 726 check.put(thread1, l2); 727 check.put(thread2, l1); 728 assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check); 729 730 lockManager.clearAllLocksFor(c1); 731 } 732 733 public void testUpgradeDeadLock() { 734 LockID l1 = new LockID("L1"); 736 737 ChannelID c0 = new ChannelID(0); 738 ThreadID s1 = new ThreadID(1); 739 ThreadID s2 = new ThreadID(2); 740 741 ServerThreadID thread1 = new ServerThreadID(c0, s1); 742 ServerThreadID thread2 = new ServerThreadID(c0, s2); 743 744 lockManager.start(); 745 746 lockManager.requestLock(l1, c0, s1, LockLevel.READ, sink); 748 lockManager.requestLock(l1, c0, s2, LockLevel.READ, sink); 750 751 lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink); 753 lockManager.requestLock(l1, c0, s2, LockLevel.WRITE, sink); 755 756 TestDeadlockResults deadlocks = new TestDeadlockResults(); 757 lockManager.scanForDeadlocks(deadlocks); 758 759 assertEquals(1, deadlocks.chains.size()); 760 761 Map check = new HashMap (); 762 check.put(thread1, l1); 763 check.put(thread2, l1); 764 assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check); 765 766 lockManager.clearAllLocksFor(c0); 767 } 768 769 public void testLackOfDeadlock() throws InterruptedException { 770 lockManager.start(); 771 for (int i = 0; i < 500; i++) { 772 internalTestLackofDeadlock(false); 773 resetLockManager(true); 774 internalTestLackofDeadlock(true); 775 resetLockManager(true); 776 } 777 } 778 779 private void internalTestLackofDeadlock(boolean useRealThreads) throws InterruptedException { 780 List threads = new ArrayList (); 781 782 for (int t = 0; t < numThreads; t++) { 783 ChannelID cid = txns[t].getChannelID(); 784 ThreadID tid = txns[t].getClientThreadID(); 785 786 RandomRequest req = new RandomRequest(cid, tid); 787 if (useRealThreads) { 788 Thread thread = new Thread (req); 789 thread.start(); 790 threads.add(thread); 791 } else { 792 req.run(); 793 } 794 } 795 796 if (useRealThreads) { 797 for (Iterator iter = threads.iterator(); iter.hasNext();) { 798 Thread t = (Thread ) iter.next(); 799 t.join(); 800 } 801 } 802 803 TestDeadlockResults results = new TestDeadlockResults(); 804 lockManager.scanForDeadlocks(results); 805 806 assertEquals(0, results.chains.size()); 807 808 for (int i = 0; i < txns.length; i++) { 809 lockManager.clearAllLocksFor(txns[i].getChannelID()); 810 } 811 } 812 813 private class RandomRequest implements Runnable { 814 private final ChannelID cid; 815 private final ThreadID tid; 816 817 public RandomRequest(ChannelID cid, ThreadID tid) { 818 this.cid = cid; 819 this.tid = tid; 820 } 821 822 public void run() { 823 final int start = random.nextInt(numLocks); 824 final int howMany = random.nextInt(numLocks - start); 825 826 for (int i = 0; i < howMany; i++) { 827 LockID lock = locks[start + i]; 828 boolean read = random.nextInt(10) < 8; int level = read ? LockLevel.READ : LockLevel.WRITE; 830 boolean granted = lockManager.requestLock(lock, cid, tid, level, sink); 831 if (!granted) { 832 break; 833 } 834 } 835 } 836 } 837 838 private ServerThreadID[] makeUniqueTxns(int num) { 839 ServerThreadID[] rv = new ServerThreadID[num]; 840 for (int i = 0; i < num; i++) { 841 rv[i] = new ServerThreadID(new ChannelID(i), new ThreadID(i)); 842 } 843 return rv; 844 } 845 846 private LockID[] makeUniqueLocks(int num) { 847 LockID[] rv = new LockID[num]; 848 for (int i = 0; i < num; i++) { 849 rv[i] = new LockID("lock-" + i); 850 } 851 852 return rv; 853 } 854 855 private void assertSpecificDeadlock(DeadlockChain chain, Map check) { 856 DeadlockChain start = chain; 857 do { 858 LockID lock = (LockID) check.remove(chain.getWaiter()); 859 assertEquals(lock, chain.getWaitingOn()); 860 chain = chain.getNextLink(); 861 } while (chain != start); 862 863 assertEquals(0, check.size()); 864 } 865 866 public void testDeadLock2() { 867 872 LockID l1 = new LockID("L1"); 873 LockID l2 = new LockID("L2"); 874 LockID l3 = new LockID("L3"); 875 ChannelID c0 = new ChannelID(0); 876 ThreadID s1 = new ThreadID(1); 877 ThreadID s2 = new ThreadID(2); 878 ThreadID s3 = new ThreadID(3); 879 880 ServerThreadID thread1 = new ServerThreadID(c0, s1); 881 ServerThreadID thread2 = new ServerThreadID(c0, s2); 882 ServerThreadID thread3 = new ServerThreadID(c0, s3); 883 884 lockManager.start(); 885 886 lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink); 888 lockManager.requestLock(l2, c0, s2, LockLevel.WRITE, sink); 890 lockManager.requestLock(l3, c0, s3, LockLevel.WRITE, sink); 892 893 lockManager.requestLock(l2, c0, s1, LockLevel.WRITE, sink); 895 lockManager.requestLock(l3, c0, s2, LockLevel.WRITE, sink); 897 lockManager.requestLock(l1, c0, s3, LockLevel.WRITE, sink); 899 900 TestDeadlockResults deadlocks = new TestDeadlockResults(); 901 lockManager.scanForDeadlocks(deadlocks); 902 903 assertEquals(1, deadlocks.chains.size()); 904 905 Map check = new HashMap (); 906 check.put(thread1, l2); 907 check.put(thread2, l3); 908 check.put(thread3, l1); 909 assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check); 910 911 lockManager.clearAllLocksFor(c0); 912 } 913 914 private class ShutdownThread extends Thread { 915 private final LinkedQueue shutdownSteps; 916 917 private ShutdownThread(LinkedQueue shutdownSteps) { 918 this.shutdownSteps = shutdownSteps; 919 } 920 921 public void run() { 922 try { 923 shutdownSteps.put(new Object ()); 924 lockManager.stop(); 925 shutdownSteps.put(new Object ()); 926 } catch (Exception e) { 927 e.printStackTrace(); 928 fail(); 929 } 930 } 931 } 932 933 private static class TestMessageChannel implements MessageChannel { 934 935 public TCSocketAddress getLocalAddress() { 936 throw new ImplementMe(); 937 } 938 939 public TCSocketAddress getRemoteAddress() { 940 try { 941 return new TCSocketAddress("127.0.0.1", 6969); 942 } catch (UnknownHostException e) { 943 throw new RuntimeException (e); 944 } 945 } 946 947 public void addListener(ChannelEventListener listener) { 948 throw new ImplementMe(); 949 } 950 951 public ChannelID getChannelID() { 952 throw new ImplementMe(); 953 } 954 955 public boolean isOpen() { 956 throw new ImplementMe(); 957 } 958 959 public boolean isClosed() { 960 throw new ImplementMe(); 961 } 962 963 public TCMessage createMessage(TCMessageType type) { 964 throw new ImplementMe(); 965 } 966 967 public Object getAttachment(String key) { 968 throw new ImplementMe(); 969 } 970 971 public void addAttachment(String key, Object value, boolean replace) { 972 throw new ImplementMe(); 973 } 974 975 public Object removeAttachment(String key) { 976 throw new ImplementMe(); 977 } 978 979 public boolean isConnected() { 980 throw new ImplementMe(); 981 } 982 983 public void send(TCNetworkMessage message) { 984 throw new ImplementMe(); 985 } 986 987 public NetworkStackID open() { 988 throw new ImplementMe(); 989 } 990 991 public void close() { 992 throw new ImplementMe(); 993 } 994 995 } 996 997 private static class TestDeadlockResults implements DeadlockResults { 998 final List chains = new ArrayList (); 999 1000 public void foundDeadlock(DeadlockChain chain) { 1001 chains.add(chain); 1002 } 1003 } 1004 1005} 1006 | Popular Tags |