1 4 package com.tc.objectserver.lockmanager.impl; 5 6 import org.apache.commons.io.output.NullOutputStream; 7 8 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 9 10 import com.tc.exception.ImplementMe; 11 import com.tc.net.TCSocketAddress; 12 import com.tc.net.protocol.NetworkStackID; 13 import com.tc.net.protocol.TCNetworkMessage; 14 import com.tc.net.protocol.tcm.ChannelEventListener; 15 import com.tc.net.protocol.tcm.ChannelID; 16 import com.tc.net.protocol.tcm.MessageChannel; 17 import com.tc.net.protocol.tcm.TCMessage; 18 import com.tc.net.protocol.tcm.TCMessageType; 19 import com.tc.object.lockmanager.api.LockID; 20 import com.tc.object.lockmanager.api.LockLevel; 21 import com.tc.object.lockmanager.api.ServerThreadID; 22 import com.tc.object.lockmanager.api.ThreadID; 23 import com.tc.object.tx.WaitInvocation; 24 import com.tc.objectserver.api.TestSink; 25 import com.tc.objectserver.context.LockResponseContext; 26 import com.tc.objectserver.lockmanager.api.DeadlockChain; 27 import com.tc.objectserver.lockmanager.api.DeadlockResults; 28 import com.tc.objectserver.lockmanager.api.LockHolder; 29 import com.tc.objectserver.lockmanager.api.LockMBean; 30 import com.tc.objectserver.lockmanager.api.NullChannelManager; 31 import com.tc.objectserver.lockmanager.api.ServerLockRequest; 32 import com.tc.objectserver.lockmanager.api.Waiter; 33 import com.tc.util.concurrent.ThreadUtil; 34 35 import java.io.IOException ; 36 import java.io.ObjectOutputStream ; 37 import java.net.UnknownHostException ; 38 import java.util.ArrayList ; 39 import java.util.Arrays ; 40 import java.util.Comparator ; 41 import java.util.HashMap ; 42 import java.util.Iterator ; 43 import java.util.List ; 44 import java.util.Map ; 45 import java.util.Random ; 46 47 import junit.framework.TestCase; 48 49 52 public class GreedyLockManagerTest extends TestCase { 53 private TestSink sink; 54 private LockManagerImpl lockManager; 55 private Random random = new Random (); 56 57 final int numLocks = 100; 58 final int numThreads = 30; 59 private LockID[] locks = makeUniqueLocks(numLocks); 60 private ServerThreadID[] txns = makeUniqueTxns(numThreads); 61 62 protected void setUp() throws Exception { 63 super.setUp(); 64 resetLockManager(); 65 sink = new TestSink(); 66 } 67 68 private void resetLockManager() { 69 resetLockManager(false); 70 } 71 72 private void resetLockManager(boolean start) { 73 if (lockManager != null) { 74 try { 75 lockManager.stop(); 76 } catch (InterruptedException e) { 77 fail(); 78 } 79 } 80 81 lockManager = new LockManagerImpl(new NullChannelManager()); 82 if (start) { 83 lockManager.start(); 84 } 85 } 86 87 protected void tearDown() throws Exception { 88 assertEquals(0, lockManager.getLockCount()); 89 assertEquals(0, lockManager.getThreadContextCount()); 90 super.tearDown(); 91 } 92 93 public void testLockMBean() throws IOException { 94 95 final MessageChannel channel = new TestMessageChannel(); 96 97 final long start = System.currentTimeMillis(); 98 final ChannelID cid1 = new ChannelID(1); 99 ChannelID cid2 = new ChannelID(2); 100 ChannelID cid3 = new ChannelID(3); 101 LockID lid1 = new LockID("1"); 102 LockID lid2 = new LockID("2"); 103 LockID lid3 = new LockID("3"); 104 ThreadID tid1 = new ThreadID(1); 105 WaitInvocation wait = new WaitInvocation(Integer.MAX_VALUE); 106 107 lockManager = new LockManagerImpl(new NullChannelManager() { 108 public MessageChannel getChannel(ChannelID id) { 109 if (cid1.equals(id)) { return channel; } 110 return null; 111 } 112 113 public String getChannelAddress(ChannelID channelID) { 114 if (cid1.equals(channelID)) { return "127.0.0.1:6969"; } 115 return "no longer connected"; 116 } 117 }); 118 119 lockManager.start(); 120 121 lockManager.requestLock(lid1, cid1, tid1, LockLevel.WRITE, sink); lockManager.requestLock(lid1, cid2, tid1, LockLevel.WRITE, sink); 124 lockManager.requestLock(lid2, cid1, tid1, LockLevel.READ, sink); lockManager.requestLock(lid2, cid2, tid1, LockLevel.READ, sink); lockManager.requestLock(lid2, cid3, tid1, LockLevel.WRITE, sink); 128 lockManager.requestLock(lid3, cid1, tid1, LockLevel.WRITE, sink); 130 LockMBean[] lockBeans = lockManager.getAllLocks(); 131 assertEquals(3, lockBeans.length); 132 sortLocksByID(lockBeans); 133 134 LockMBean bean1 = lockBeans[0]; 135 LockMBean bean2 = lockBeans[1]; 136 LockMBean bean3 = lockBeans[2]; 137 testSerialize(bean1); 138 testSerialize(bean2); 139 testSerialize(bean3); 140 141 validateBean1(bean1, start); 142 validateBean2(bean2, start); 143 validateBean3(bean3, start, wait); 144 145 lockManager.clearAllLocksFor(cid1); 146 lockManager.clearAllLocksFor(cid2); 147 lockManager.clearAllLocksFor(cid3); 148 } 149 150 private void validateBean3(LockMBean bean3, long time, WaitInvocation wait) { 151 LockHolder[] holders = bean3.getHolders(); 152 ServerLockRequest[] reqs = bean3.getPendingRequests(); 153 ServerLockRequest[] upgrades = bean3.getPendingUpgrades(); 154 Waiter[] waiters = bean3.getWaiters(); 155 assertEquals(1, holders.length); 156 assertEquals(0, reqs.length); 157 assertEquals(0, upgrades.length); 158 assertEquals(0, waiters.length); 159 160 LockHolder holder = holders[0]; 161 assertEquals(LockLevel.toString(LockLevel.WRITE), holder.getLockLevel()); 162 assertTrue(holder.getTimeAcquired() >= time); 163 assertEquals(new ChannelID(1), holder.getChannelID()); 164 assertEquals("127.0.0.1:6969", holder.getChannelAddr()); 165 assertEquals(ThreadID.VM_ID, holder.getThreadID()); 166 167 } 168 169 private void validateBean2(LockMBean bean2, long time) { 170 LockHolder[] holders = bean2.getHolders(); 171 ServerLockRequest[] reqs = bean2.getPendingRequests(); 172 ServerLockRequest[] upgrades = bean2.getPendingUpgrades(); 173 Waiter[] waiters = bean2.getWaiters(); 174 assertEquals(2, holders.length); 175 assertEquals(1, reqs.length); 176 assertEquals(0, upgrades.length); 177 assertEquals(0, waiters.length); 178 179 LockHolder holder = holders[0]; 180 assertEquals(LockLevel.toString(LockLevel.READ), holder.getLockLevel()); 181 assertTrue(holder.getTimeAcquired() >= time); 182 if (new ChannelID(1).equals(holder.getChannelID())) { 183 assertEquals("127.0.0.1:6969", holder.getChannelAddr()); 184 } else if (new ChannelID(2).equals(holder.getChannelID())) { 185 assertEquals("no longer connected", holder.getChannelAddr()); 186 } else { 187 fail("Invalid Channel number ! " + holder.getChannelID()); 188 } 189 assertEquals(ThreadID.VM_ID, holder.getThreadID()); 190 191 holder = holders[1]; 192 assertEquals(LockLevel.toString(LockLevel.READ), holder.getLockLevel()); 193 assertTrue(holder.getTimeAcquired() >= time); 194 if (new ChannelID(1).equals(holder.getChannelID())) { 195 assertEquals("127.0.0.1:6969", holder.getChannelAddr()); 196 } else if (new ChannelID(2).equals(holder.getChannelID())) { 197 assertEquals("no longer connected", holder.getChannelAddr()); 198 } else { 199 fail("Invalid Channel number ! " + holder.getChannelID()); 200 } 201 assertEquals(ThreadID.VM_ID, holder.getThreadID()); 202 203 ServerLockRequest req = reqs[0]; 204 assertEquals(LockLevel.toString(LockLevel.WRITE), req.getLockLevel()); 205 assertTrue(req.getRequestTime() >= time); 206 assertEquals(new ChannelID(3), req.getChannelID()); 207 assertEquals("no longer connected", req.getChannelAddr()); 208 assertEquals(new ThreadID(1), req.getThreadID()); 209 } 210 211 private void validateBean1(LockMBean bean1, long time) { 212 LockHolder[] holders = bean1.getHolders(); 213 ServerLockRequest[] reqs = bean1.getPendingRequests(); 214 ServerLockRequest[] upgrades = bean1.getPendingUpgrades(); 215 Waiter[] waiters = bean1.getWaiters(); 216 assertEquals(1, holders.length); 217 assertEquals(1, reqs.length); 218 assertEquals(0, upgrades.length); 219 assertEquals(0, waiters.length); 220 221 LockHolder holder = holders[0]; 222 assertEquals(LockLevel.toString(LockLevel.WRITE), holder.getLockLevel()); 223 assertTrue(holder.getTimeAcquired() >= time); 224 assertEquals(new ChannelID(1), holder.getChannelID()); 225 assertEquals(ThreadID.VM_ID, holder.getThreadID()); 226 assertEquals("127.0.0.1:6969", holder.getChannelAddr()); 227 228 ServerLockRequest req = reqs[0]; 229 assertEquals(LockLevel.toString(LockLevel.WRITE), req.getLockLevel()); 230 assertTrue(req.getRequestTime() >= time); 231 assertEquals(new ChannelID(2), req.getChannelID()); 232 assertEquals("no longer connected", req.getChannelAddr()); 233 assertEquals(new ThreadID(1), req.getThreadID()); 234 } 235 236 private void testSerialize(Object o) throws IOException { 237 ObjectOutputStream oos = new ObjectOutputStream (new NullOutputStream()); 238 oos.writeObject(o); 239 oos.close(); 240 } 241 242 private void sortLocksByID(LockMBean[] lockBeans) { 243 Arrays.sort(lockBeans, new Comparator () { 244 public int compare(Object o1, Object o2) { 245 LockMBean l1 = (LockMBean) o1; 246 LockMBean l2 = (LockMBean) o2; 247 248 String id1 = l1.getLockName(); 249 String id2 = l2.getLockName(); 250 251 return id1.compareTo(id2); 252 } 253 }); 254 } 255 256 public void testReestablishWait() throws Exception { 257 LockID lockID1 = new LockID("my lock"); 258 ChannelID channel1 = new ChannelID(1); 259 ThreadID tx1 = new ThreadID(1); 260 ThreadID tx2 = new ThreadID(2); 261 262 try { 263 assertEquals(0, lockManager.getLockCount()); 264 long waitTime = 1000; 265 WaitInvocation waitCall1 = new WaitInvocation(waitTime); 266 WaitInvocation waitCall2 = new WaitInvocation(waitTime * 2); 267 TestSink responseSink = new TestSink(); 268 long t0 = System.currentTimeMillis(); 269 lockManager.reestablishWait(lockID1, channel1, tx1, LockLevel.WRITE, waitCall1, responseSink); 270 lockManager.reestablishWait(lockID1, channel1, tx2, LockLevel.WRITE, waitCall2, responseSink); 271 lockManager.start(); 272 273 LockResponseContext ctxt = (LockResponseContext) responseSink.take(); 275 assertTrue(System.currentTimeMillis() - t0 >= waitTime); 276 assertTrue(ctxt.isLockWaitTimeout()); 277 assertResponseContext(lockID1, channel1, tx1, LockLevel.WRITE, ctxt); 278 279 LockResponseContext ctxt1 = (LockResponseContext) responseSink.take(); 281 LockResponseContext ctxt2 = (LockResponseContext) responseSink.take(); 282 assertTrue(System.currentTimeMillis() - t0 >= waitTime); 283 assertTrue((ctxt1.isLockAward() && ctxt2.isLockWaitTimeout()) 284 || (ctxt2.isLockAward() && ctxt1.isLockWaitTimeout())); 285 286 if (ctxt1.isLockAward()) { 287 assertAwardNotGreedy(ctxt1, lockID1, tx1); 288 } else if (ctxt2.isLockAward()) { 289 assertAwardNotGreedy(ctxt2, lockID1, tx1); 290 } 291 292 lockManager.unlock(lockID1, channel1, tx1); 293 294 ctxt = (LockResponseContext) responseSink.take(); 296 assertAwardGreedy(ctxt, lockID1); 297 298 assertTrue(responseSink.waitForAdd(waitTime * 3) == null); 299 300 } finally { 301 lockManager = null; 302 resetLockManager(); 303 } 304 } 305 306 private void assertAwardNotGreedy(LockResponseContext ctxt, LockID lockID1, ThreadID tx1) { 307 assertTrue(ctxt != null); 308 assertTrue(ctxt.isLockAward()); 309 assertTrue(ctxt.getThreadID().equals(tx1)); 310 assertTrue(ctxt.getLockID().equals(lockID1)); 311 assertTrue(!LockLevel.isGreedy(ctxt.getLockLevel())); 312 } 313 314 private void assertAwardGreedy(LockResponseContext ctxt, LockID lockID1) { 315 assertTrue(ctxt != null); 316 assertTrue(ctxt.isLockAward()); 317 assertTrue(ctxt.getThreadID().equals(ThreadID.VM_ID)); 318 assertTrue(ctxt.getLockID().equals(lockID1)); 319 assertTrue(LockLevel.isGreedy(ctxt.getLockLevel())); 320 321 } 322 323 public void testReestablishLockAfterReestablishWait() throws Exception { 324 LockID lockID1 = new LockID("my lock"); 325 ChannelID channel1 = new ChannelID(1); 326 ThreadID tx1 = new ThreadID(1); 327 ThreadID tx2 = new ThreadID(2); 328 int requestedLevel = LockLevel.WRITE; 329 WaitInvocation waitCall = new WaitInvocation(); 330 try { 331 TestSink responseSink = new TestSink(); 332 assertEquals(0, lockManager.getLockCount()); 333 lockManager.reestablishWait(lockID1, channel1, tx1, LockLevel.WRITE, waitCall, responseSink); 334 assertEquals(1, lockManager.getLockCount()); 335 assertEquals(0, responseSink.getInternalQueue().size()); 336 337 try { 339 lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink); 340 fail("Should have thrown an AssertionError."); 341 } catch (AssertionError e) { 342 } 344 assertEquals(1, lockManager.getLockCount()); 347 lockManager.reestablishLock(lockID1, channel1, tx2, requestedLevel, responseSink); 348 } finally { 349 lockManager = null; 350 resetLockManager(); 351 } 352 } 353 354 public void testReestablishReadLock() throws Exception { 355 LockID lockID1 = new LockID("my lock"); 356 ChannelID channel1 = new ChannelID(1); 357 ThreadID tx1 = new ThreadID(1); 358 ThreadID tx2 = new ThreadID(2); 359 ThreadID tx3 = new ThreadID(3); 360 int requestedLevel = LockLevel.READ; 361 362 try { 363 TestSink responseSink = new TestSink(); 364 assertEquals(0, lockManager.getLockCount()); 365 366 lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink); 367 assertEquals(1, lockManager.getLockCount()); 368 369 responseSink = new TestSink(); 372 lockManager.reestablishLock(lockID1, channel1, tx2, requestedLevel, responseSink); 373 assertEquals(1, lockManager.getLockCount()); 374 375 responseSink = new TestSink(); 377 try { 378 lockManager.reestablishLock(lockID1, channel1, tx3, LockLevel.WRITE, responseSink); 379 fail("Should have thrown a LockManagerError."); 380 } catch (AssertionError e) { 381 } 383 384 } finally { 385 lockManager = null; 387 resetLockManager(); 388 } 389 390 try { 391 TestSink responseSink = new TestSink(); 392 assertEquals(0, lockManager.getLockCount()); 393 lockManager.reestablishLock(lockID1, channel1, tx1, LockLevel.WRITE, responseSink); 394 assertEquals(1, lockManager.getLockCount()); 395 396 responseSink = new TestSink(); 398 try { 399 lockManager.reestablishLock(lockID1, channel1, tx2, LockLevel.READ, responseSink); 400 fail("Should have thrown a LockManagerError"); 401 } catch (Error e) { 402 } 404 405 } finally { 406 lockManager = null; 407 resetLockManager(); 408 } 409 } 410 411 public void testReestablishWriteLock() throws Exception { 412 413 LockID lockID1 = new LockID("my lock"); 414 LockID lockID2 = new LockID("my other lock"); 415 ChannelID channel1 = new ChannelID(1); 416 ChannelID channel2 = new ChannelID(2); 417 ThreadID tx1 = new ThreadID(1); 418 ThreadID tx2 = new ThreadID(2); 419 int requestedLevel = LockLevel.WRITE; 420 421 try { 422 TestSink responseSink = new TestSink(); 423 assertEquals(0, lockManager.getLockCount()); 424 lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink); 425 assertEquals(1, lockManager.getLockCount()); 426 427 try { 428 lockManager.reestablishLock(lockID1, channel2, tx2, requestedLevel, responseSink); 429 fail("Expected a LockManagerError!"); 430 } catch (AssertionError e) { 431 } 433 434 lockManager.reestablishLock(lockID2, channel1, tx1, requestedLevel, responseSink); 436 437 lockManager.start(); 438 try { 441 lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, null); 442 fail("Should have thrown a LockManagerError"); 443 } catch (Error e) { 444 } 446 447 } finally { 448 lockManager = null; 450 resetLockManager(); 451 } 452 } 453 454 461 private void assertResponseContext(LockID lockID, ChannelID channel, ThreadID tx1, int requestedLevel, 462 LockResponseContext ctxt) { 463 assertEquals(lockID, ctxt.getLockID()); 464 assertEquals(channel, ctxt.getChannelID()); 465 assertEquals(tx1, ctxt.getThreadID()); 466 assertEquals(requestedLevel, ctxt.getLockLevel()); 467 } 468 469 public void testWaitTimeoutsIgnoredDuringStartup() throws Exception { 470 LockID lockID = new LockID("my lcok"); 471 ChannelID channel1 = new ChannelID(1); 472 ThreadID tx1 = new ThreadID(1); 473 try { 474 long waitTime = 1000; 475 WaitInvocation waitInvocation = new WaitInvocation(waitTime); 476 TestSink responseSink = new TestSink(); 477 lockManager.reestablishWait(lockID, channel1, tx1, LockLevel.WRITE, waitInvocation, responseSink); 478 479 LockResponseContext ctxt = (LockResponseContext) responseSink.waitForAdd(waitTime * 2); 480 assertNull(ctxt); 481 482 lockManager.start(); 483 ctxt = (LockResponseContext) responseSink.waitForAdd(0); 484 assertNotNull(ctxt); 485 } finally { 486 lockManager = null; 487 resetLockManager(); 488 } 489 } 490 491 public void testOffDoesNotBlockUntilNoOutstandingLocksViaUnlock() throws Exception { 492 List queue = sink.getInternalQueue(); 493 ChannelID channel1 = new ChannelID(1); 494 LockID lock1 = new LockID("1"); 495 ThreadID tx1 = new ThreadID(1); 496 497 final LinkedQueue shutdownSteps = new LinkedQueue(); 498 ShutdownThread shutdown = new ShutdownThread(shutdownSteps); 499 try { 500 lockManager.start(); 501 lockManager.requestLock(lock1, channel1, tx1, LockLevel.WRITE, sink); 502 assertEquals(1, queue.size()); 503 504 shutdown.start(); 505 shutdownSteps.take(); 506 ThreadUtil.reallySleep(1000); 507 shutdownSteps.take(); 508 } finally { 509 lockManager = null; 510 resetLockManager(); 511 } 512 } 513 514 public void testOffStopsGrantingNewLocks() throws Exception { 515 List queue = sink.getInternalQueue(); 516 ChannelID channelID = new ChannelID(1); 517 LockID lockID = new LockID("1"); 518 ThreadID txID = new ThreadID(1); 519 try { 520 lockManager.start(); 522 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 523 assertEquals(1, queue.size()); 524 assertAwardGreedy((LockResponseContext) queue.get(0), lockID); 525 queue.clear(); 526 lockManager.unlock(lockID, channelID, ThreadID.VM_ID); 527 528 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 529 assertEquals(1, queue.size()); 530 assertAwardGreedy((LockResponseContext) queue.get(0), lockID); 531 queue.clear(); 532 lockManager.unlock(lockID, channelID, ThreadID.VM_ID); 533 534 queue.clear(); 537 lockManager.stop(); 538 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 539 assertEquals(0, queue.size()); 540 } finally { 541 lockManager.clearAllLocksFor(channelID); 542 } 543 } 544 545 public void testRequestDoesntGrantPendingLocks() throws Exception { 546 List queue = sink.getInternalQueue(); 547 ChannelID channelID = new ChannelID(1); 548 LockID lockID = new LockID("1"); 549 ThreadID txID = new ThreadID(1); 550 551 try { 552 lockManager.start(); 553 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 556 queue.clear(); 557 lockManager.requestLock(lockID, new ChannelID(2), new ThreadID(2), LockLevel.WRITE, sink); 558 assertEquals(1, queue.size()); 560 LockResponseContext lrc = (LockResponseContext) sink.take(); 561 assertTrue(lrc.isLockRecall()); 562 assertEquals(lockID, lrc.getLockID()); 563 assertEquals(channelID, lrc.getChannelID()); 564 assertEquals(ThreadID.VM_ID, lrc.getThreadID()); 565 assertEquals(LockLevel.WRITE, lrc.getLockLevel()); 566 } finally { 567 lockManager = null; 568 resetLockManager(); 569 } 570 } 571 572 public void testUnlockIgnoredDuringShutdown() throws Exception { 573 List queue = sink.getInternalQueue(); 574 ChannelID channelID = new ChannelID(1); 575 LockID lockID = new LockID("1"); 576 ThreadID txID = new ThreadID(1); 577 try { 578 lockManager.start(); 579 lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink); 582 queue.clear(); 583 lockManager.requestLock(lockID, new ChannelID(2), new ThreadID(2), LockLevel.WRITE, sink); 584 assertEquals(1, queue.size()); 586 LockResponseContext lrc = (LockResponseContext) sink.take(); 587 assertTrue(lrc.isLockRecall()); 588 assertEquals(lockID, lrc.getLockID()); 589 assertEquals(channelID, lrc.getChannelID()); 590 assertEquals(ThreadID.VM_ID, lrc.getThreadID()); 591 assertEquals(LockLevel.WRITE, lrc.getLockLevel()); 592 593 assertEquals(0, queue.size()); 594 595 lockManager.stop(); 596 597 lockManager.unlock(lockID, channelID, txID); 599 assertEquals(0, queue.size()); 601 602 } finally { 603 lockManager = null; 604 resetLockManager(); 605 } 606 } 607 608 public void testDeadLock1() { 609 if (true) return; 611 612 615 LockID l1 = new LockID("1"); 616 LockID l2 = new LockID("2"); 617 ChannelID c1 = new ChannelID(1); 618 619 ThreadID s1 = new ThreadID(1); 620 ThreadID s2 = new ThreadID(2); 621 622 ServerThreadID thread1 = new ServerThreadID(c1, s1); 623 ServerThreadID thread2 = new ServerThreadID(c1, s2); 624 625 lockManager.start(); 626 lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink); 628 lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink); 630 lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink); 632 lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink); 634 635 TestDeadlockResults deadlocks = new TestDeadlockResults(); 636 lockManager.scanForDeadlocks(deadlocks); 637 638 assertEquals(1, deadlocks.chains.size()); 639 Map check = new HashMap (); 640 check.put(thread1, l2); 641 check.put(thread2, l1); 642 assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check); 643 644 DeadlockChain[] results = lockManager.scanForDeadlocks(); 646 assertEquals(1, results.length); 647 check = new HashMap (); 648 check.put(thread1, l2); 649 check.put(thread2, l1); 650 assertSpecificDeadlock(results[0], check); 651 652 lockManager.clearAllLocksFor(c1); 653 } 654 655 public void testDeadLock3() { 656 if (true) return; 658 659 661 LockID l1 = new LockID("1"); 663 LockID l2 = new LockID("2"); 664 665 LockID l3 = new LockID("3"); 667 LockID l4 = new LockID("4"); 668 LockID l5 = new LockID("5"); 669 670 ChannelID c1 = new ChannelID(1); 671 ThreadID s1 = new ThreadID(1); 672 ThreadID s2 = new ThreadID(2); 673 674 ServerThreadID thread1 = new ServerThreadID(c1, s1); 675 ServerThreadID thread2 = new ServerThreadID(c1, s2); 676 677 lockManager.start(); 678 679 lockManager.requestLock(l3, c1, s1, LockLevel.READ, sink); 681 lockManager.requestLock(l4, c1, s1, LockLevel.READ, sink); 682 lockManager.requestLock(l5, c1, s1, LockLevel.READ, sink); 683 lockManager.requestLock(l3, c1, s2, LockLevel.READ, sink); 684 lockManager.requestLock(l4, c1, s2, LockLevel.READ, sink); 685 686 lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink); 688 lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink); 690 lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink); 692 lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink); 694 695 TestDeadlockResults deadlocks = new TestDeadlockResults(); 696 lockManager.scanForDeadlocks(deadlocks); 697 698 assertEquals(1, deadlocks.chains.size()); 699 Map check = new HashMap (); 700 check.put(thread1, l2); 701 check.put(thread2, l1); 702 assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check); 703 704 lockManager.clearAllLocksFor(c1); 705 } 706 707 public void testUpgradeDeadLock() { 708 if (true) return; 710 711 LockID l1 = new LockID("L1"); 713 714 ChannelID c0 = new ChannelID(0); 715 ThreadID s1 = new ThreadID(1); 716 ThreadID s2 = new ThreadID(2); 717 718 ServerThreadID thread1 = new ServerThreadID(c0, s1); 719 ServerThreadID thread2 = new ServerThreadID(c0, s2); 720 721 lockManager.start(); 722 723 lockManager.requestLock(l1, c0, s1, LockLevel.READ, sink); 725 lockManager.requestLock(l1, c0, s2, LockLevel.READ, sink); 727 728 lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink); 730 lockManager.requestLock(l1, c0, s2, LockLevel.WRITE, sink); 732 733 TestDeadlockResults deadlocks = new TestDeadlockResults(); 734 lockManager.scanForDeadlocks(deadlocks); 735 736 assertEquals(1, deadlocks.chains.size()); 737 738 Map check = new HashMap (); 739 check.put(thread1, l1); 740 check.put(thread2, l1); 741 assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check); 742 743 lockManager.clearAllLocksFor(c0); 744 } 745 746 public void testLackOfDeadlock() throws InterruptedException { 747 lockManager.start(); 748 for (int i = 0; i < 50; i++) { 749 internalTestLackofDeadlock(false); 750 resetLockManager(true); 751 internalTestLackofDeadlock(true); 752 resetLockManager(true); 753 } 754 } 755 756 private void internalTestLackofDeadlock(boolean useRealThreads) throws InterruptedException { 757 List threads = new ArrayList (); 758 759 for (int t = 0; t < numThreads; t++) { 760 ChannelID cid = txns[t].getChannelID(); 761 ThreadID tid = txns[t].getClientThreadID(); 762 763 RandomRequest req = new RandomRequest(cid, tid); 764 if (useRealThreads) { 765 Thread thread = new Thread (req); 766 thread.start(); 767 threads.add(thread); 768 } else { 769 req.run(); 770 } 771 } 772 773 if (useRealThreads) { 774 for (Iterator iter = threads.iterator(); iter.hasNext();) { 775 Thread t = (Thread ) iter.next(); 776 t.join(); 777 } 778 } 779 780 TestDeadlockResults results = new TestDeadlockResults(); 781 lockManager.scanForDeadlocks(results); 782 783 assertEquals(0, results.chains.size()); 784 785 for (int i = 0; i < txns.length; i++) { 786 lockManager.clearAllLocksFor(txns[i].getChannelID()); 787 } 788 } 789 790 private class RandomRequest implements Runnable { 791 private final ChannelID cid; 792 private final ThreadID tid; 793 794 public RandomRequest(ChannelID cid, ThreadID tid) { 795 this.cid = cid; 796 this.tid = tid; 797 } 798 799 public void run() { 800 final int start = random.nextInt(numLocks); 801 final int howMany = random.nextInt(numLocks - start); 802 803 for (int i = 0; i < howMany; i++) { 804 LockID lock = locks[start + i]; 805 boolean read = random.nextInt(10) < 8; int level = read ? LockLevel.READ : LockLevel.WRITE; 807 boolean granted = lockManager.requestLock(lock, cid, tid, level, sink); 808 if (!granted) { 809 break; 810 } 811 } 812 } 813 } 814 815 private ServerThreadID[] makeUniqueTxns(int num) { 816 ServerThreadID[] rv = new ServerThreadID[num]; 817 for (int i = 0; i < num; i++) { 818 rv[i] = new ServerThreadID(new ChannelID(i), new ThreadID(i)); 819 } 820 return rv; 821 } 822 823 private LockID[] makeUniqueLocks(int num) { 824 LockID[] rv = new LockID[num]; 825 for (int i = 0; i < num; i++) { 826 rv[i] = new LockID("lock-" + i); 827 } 828 829 return rv; 830 } 831 832 private void assertSpecificDeadlock(DeadlockChain chain, Map check) { 833 DeadlockChain start = chain; 834 do { 835 LockID lock = (LockID) check.remove(chain.getWaiter()); 836 assertEquals(lock, chain.getWaitingOn()); 837 chain = chain.getNextLink(); 838 } while (chain != start); 839 840 assertEquals(0, check.size()); 841 } 842 843 public void testDeadLock2() { 844 if (true) return; 846 847 852 LockID l1 = new LockID("L1"); 853 LockID l2 = new LockID("L2"); 854 LockID l3 = new LockID("L3"); 855 ChannelID c0 = new ChannelID(0); 856 ThreadID s1 = new ThreadID(1); 857 ThreadID s2 = new ThreadID(2); 858 ThreadID s3 = new ThreadID(3); 859 860 ServerThreadID thread1 = new ServerThreadID(c0, s1); 861 ServerThreadID thread2 = new ServerThreadID(c0, s2); 862 ServerThreadID thread3 = new ServerThreadID(c0, s3); 863 864 lockManager.start(); 865 866 lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink); 868 lockManager.requestLock(l2, c0, s2, LockLevel.WRITE, sink); 870 lockManager.requestLock(l3, c0, s3, LockLevel.WRITE, sink); 872 873 lockManager.requestLock(l2, c0, s1, LockLevel.WRITE, sink); 875 lockManager.requestLock(l3, c0, s2, LockLevel.WRITE, sink); 877 lockManager.requestLock(l1, c0, s3, LockLevel.WRITE, sink); 879 880 TestDeadlockResults deadlocks = new TestDeadlockResults(); 881 lockManager.scanForDeadlocks(deadlocks); 882 883 assertEquals(1, deadlocks.chains.size()); 884 885 Map check = new HashMap (); 886 check.put(thread1, l2); 887 check.put(thread2, l3); 888 check.put(thread3, l1); 889 assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check); 890 891 lockManager.clearAllLocksFor(c0); 892 } 893 894 private class ShutdownThread extends Thread { 895 private final LinkedQueue shutdownSteps; 896 897 private ShutdownThread(LinkedQueue shutdownSteps) { 898 this.shutdownSteps = shutdownSteps; 899 } 900 901 public void run() { 902 try { 903 shutdownSteps.put(new Object ()); 904 lockManager.stop(); 905 shutdownSteps.put(new Object ()); 906 } catch (Exception e) { 907 e.printStackTrace(); 908 fail(); 909 } 910 } 911 } 912 913 private static class TestMessageChannel implements MessageChannel { 914 915 public TCSocketAddress getLocalAddress() { 916 throw new ImplementMe(); 917 } 918 919 public TCSocketAddress getRemoteAddress() { 920 try { 921 return new TCSocketAddress("127.0.0.1", 6969); 922 } catch (UnknownHostException e) { 923 throw new RuntimeException (e); 924 } 925 } 926 927 public void addListener(ChannelEventListener listener) { 928 throw new ImplementMe(); 929 } 930 931 public ChannelID getChannelID() { 932 throw new ImplementMe(); 933 } 934 935 public boolean isOpen() { 936 throw new ImplementMe(); 937 } 938 939 public boolean isClosed() { 940 throw new ImplementMe(); 941 } 942 943 public TCMessage createMessage(TCMessageType type) { 944 throw new ImplementMe(); 945 } 946 947 public Object getAttachment(String key) { 948 throw new ImplementMe(); 949 } 950 951 public void addAttachment(String key, Object value, boolean replace) { 952 throw new ImplementMe(); 953 } 954 955 public Object removeAttachment(String key) { 956 throw new ImplementMe(); 957 } 958 959 public boolean isConnected() { 960 throw new ImplementMe(); 961 } 962 963 public void send(TCNetworkMessage message) { 964 throw new ImplementMe(); 965 } 966 967 public NetworkStackID open() { 968 throw new ImplementMe(); 969 } 970 971 public void close() { 972 throw new ImplementMe(); 973 } 974 975 } 976 977 private static class TestDeadlockResults implements DeadlockResults { 978 final List chains = new ArrayList (); 979 980 public void foundDeadlock(DeadlockChain chain) { 981 chains.add(chain); 982 } 983 } 984 985 } 986 | Popular Tags |