1 5 package com.tc.object.lockmanager.impl; 6 7 import com.tc.exception.TCRuntimeException; 8 import com.tc.logging.TCLogger; 9 import com.tc.logging.TCLogging; 10 import com.tc.object.lockmanager.api.LockFlushCallback; 11 import com.tc.object.lockmanager.api.LockID; 12 import com.tc.object.lockmanager.api.LockLevel; 13 import com.tc.object.lockmanager.api.LockNotPendingError; 14 import com.tc.object.lockmanager.api.LockRequest; 15 import com.tc.object.lockmanager.api.Notify; 16 import com.tc.object.lockmanager.api.RemoteLockManager; 17 import com.tc.object.lockmanager.api.ThreadID; 18 import com.tc.object.lockmanager.api.WaitListener; 19 import com.tc.object.lockmanager.api.WaitLockRequest; 20 import com.tc.object.lockmanager.api.WaitTimer; 21 import com.tc.object.lockmanager.api.WaitTimerCallback; 22 import com.tc.object.tx.WaitInvocation; 23 import com.tc.util.Assert; 24 import com.tc.util.State; 25 import com.tc.util.TCAssertionError; 26 import com.tc.util.Util; 27 28 import gnu.trove.TIntIntHashMap; 29 import gnu.trove.TIntStack; 30 31 import java.util.ArrayList ; 32 import java.util.Collection ; 33 import java.util.Collections ; 34 import java.util.HashMap ; 35 import java.util.HashSet ; 36 import java.util.Iterator ; 37 import java.util.LinkedHashMap ; 38 import java.util.List ; 39 import java.util.Map ; 40 import java.util.Set ; 41 import java.util.TimerTask ; 42 import java.util.Map.Entry; 43 44 class ClientLock implements WaitTimerCallback, LockFlushCallback { 45 46 private static final TCLogger logger = TCLogging.getLogger(ClientLock.class); 47 48 private static final State RUNNING = new State("RUNNING"); 49 private static final State PAUSED = new State("PAUSED"); 50 51 private final Map holders = Collections.synchronizedMap(new HashMap()); 52 private final Set rejectedLockRequesterIDs = new HashSet (); 53 private final LockID lockID; 54 private final Map waitLocksByRequesterID = new HashMap(); 55 private final Map pendingLockRequests = new LinkedHashMap (); 56 private final Map waitTimers = new HashMap(); 57 private final RemoteLockManager remoteLockManager; 58 private final WaitTimer waitTimer; 59 60 private final Greediness greediness = new Greediness(); 61 private int useCount = 0; 62 private volatile State state = RUNNING; 63 private long timeUsed = System.currentTimeMillis(); 64 65 ClientLock(LockID lockID, RemoteLockManager remoteLockManager, WaitTimer waitTimer) { 66 Assert.assertNotNull(lockID); 67 this.lockID = lockID; 68 this.remoteLockManager = remoteLockManager; 69 this.waitTimer = waitTimer; 70 } 71 72 public boolean tryLock(ThreadID threadID, int type) { 73 lock(threadID, type, true); 74 return isHeldBy(threadID, type); 75 } 76 77 public void lock(ThreadID threadID, int type) { 78 lock(threadID, type, false); 79 } 80 81 private void lock(ThreadID threadID, int type, boolean noBlock) { 82 int lockType = type; 83 if (LockLevel.isSynchronous(type)) { 84 if (!LockLevel.isSynchronousWrite(type)) { throw new AssertionError ( 85 "Only Synchronous WRITE lock is supported now"); } 86 lockType = LockLevel.WRITE; 87 } 88 basicLock(threadID, lockType, noBlock); 89 if (lockType != type) { 90 awardSynchronous(threadID, lockType); 91 } 92 } 93 94 private void basicLock(ThreadID requesterID, int type, boolean noBlock) { 95 final Object waitLock; 96 final Action action = new Action(); 97 synchronized (this) { 98 waitUntillRunning(); 99 100 if (noBlock && isHeld() && !isHeldBy(requesterID)) { return; } 103 if (isHeldBy(requesterID)) { 105 if (isConcurrentWriteLock(requesterID)) { 107 throw new AssertionError ("Don't currently support nested concurrent write locks"); 113 } 114 115 if (isHeldBy(requesterID, LockLevel.WRITE)) { 116 award(requesterID, type); 118 return; 119 } 120 121 if (LockLevel.isRead(type) && isHeldBy(requesterID, LockLevel.READ)) { 122 award(requesterID, type); 124 return; 125 } 126 } 127 if (LockLevel.isConcurrent(type)) { 128 award(requesterID, type); 129 return; 130 } 131 132 if (canAwardGreedilyNow(requesterID, type)) { 133 award(requesterID, type); 134 return; 135 } 136 137 waitLock = addToPendingLockRequest(requesterID, type, noBlock); 139 if (greediness.isNotGreedy()) { 140 if (noBlock) { 142 remoteLockManager.tryRequestLock(lockID, requesterID, type); 143 } else { 144 remoteLockManager.requestLock(lockID, requesterID, type); 145 } 146 } else { 147 if (isGreedyRecallNeeded(requesterID, type)) { 148 greediness.recall(type); 151 } 152 if (canProceedWithRecall()) { 153 greediness.startRecallCommit(); 154 action.addAction(Action.RECALL_COMMIT); 155 } 156 } 157 } 158 if (action.doRecallCommit()) { 159 flush(); 161 recallCommit(); 162 } 163 164 boolean isInterrupted = false; 165 if (noBlock) { 166 isInterrupted = waitForTryLock(requesterID, waitLock); 167 } else { 168 isInterrupted = waitForLock(requesterID, type, waitLock); 169 } 170 Util.selfInterruptIfNeeded(isInterrupted); 171 } 173 174 public void unlock(ThreadID threadID) { 175 Action action; 176 boolean changed; 177 178 do { 179 changed = false; 180 synchronized (this) { 182 waitUntillRunning(); 183 action = unlockAction(threadID); 185 } 186 187 if (action.doRemoteLockRequest() || action.doRecallCommit() || action.doSynchronousCommit()) { 189 flush(); 191 } 192 193 synchronized (this) { 195 Action newAction = unlockAction(threadID); 197 if (action.equals(newAction)) { 198 removeCurrent(threadID); 199 if (action.doAwardGreedyLocks()) { 200 awardLocksGreedily(); 201 } else if (action.doRecallCommit()) { 202 greediness.startRecallCommit(); 203 recallCommit(); 205 } else if (action.doRemoteLockRequest()) { 206 remoteLockManager.releaseLock(lockID, threadID); 208 } 209 } else { 210 changed = true; 212 logger.debug(lockID + " :: unlock() : " + threadID + " STATE CHANGED - From = " + action + " To = " 213 + newAction + " - retrying ..."); 214 } 215 } 216 } while (changed); 217 } 218 219 private Action unlockAction(ThreadID threadID) { 220 final Action action = new Action(); 221 boolean remote = isRemoteUnlockRequired(threadID); 222 if (greediness.isNotGreedy() && remote) { 223 action.addAction(Action.REMOTE_LOCK_REQUEST); 224 } else if (remote && canProceedWithRecall(threadID)) { 225 action.addAction(Action.RECALL_COMMIT); 227 } else if (greediness.isGreedy()) { 228 action.addAction(Action.AWARD_GREEDY_LOCKS); 229 } 230 if (isLockSynchronouslyHeld(threadID)) { 231 action.addAction(Action.SYNCHRONOUS_COMMIT); 232 } 233 return action; 234 } 235 236 public void wait(ThreadID threadID, WaitInvocation call, Object waitLock, WaitListener listener) 237 throws InterruptedException { 238 Action action; 239 boolean changed; 240 int server_level = LockLevel.NIL_LOCK_LEVEL; 241 if (listener == null) { throw new AssertionError ("Null WaitListener passed."); } 242 243 do { 244 changed = false; 245 246 synchronized (this) { 248 waitUntillRunning(); 249 checkValidWaitNotifyState(threadID); 250 action = waitAction(threadID); 251 } 252 253 if (action.doRemoteLockRequest() || action.doRecallCommit() || action.doSynchronousCommit()) { 255 flush(); 256 } 257 258 synchronized (this) { 260 Action newAction = waitAction(threadID); 262 if (action.equals(newAction)) { 263 LockHold holder = (LockHold) this.holders.get(threadID); 264 Assert.assertNotNull(holder); 265 server_level = holder.goToWaitState(); 266 267 Object prev = waitLocksByRequesterID.put(threadID, waitLock); 268 Assert.eval(prev == null); 269 270 WaitLockRequest waitLockRequest = new WaitLockRequest(lockID, threadID, server_level, call); 271 272 if (this.pendingLockRequests.put(threadID, waitLockRequest) != null) { 273 throw new AssertionError ("WaitLockRequest already pending: " + waitLockRequest); 275 } 276 277 if (action.doAwardGreedyLocks()) { 278 scheduleWaitTimeout(waitLockRequest); 279 awardLocksGreedily(); 280 } else if (action.doRecallCommit()) { 281 greediness.startRecallCommit(); 282 recallCommit(); 283 } else if (action.doRemoteLockRequest()) { 284 remoteLockManager.releaseLockWait(lockID, threadID, call); 285 } 286 } else { 287 changed = true; 289 logger.debug(lockID + " :: wait() : " + threadID + " : STATE CHANGED - From = " + action + " To = " 290 + newAction + " - retrying ..."); 291 } 292 } 293 } while (changed); 294 295 listener.handleWaitEvent(); 296 if (waitForLock(threadID, server_level, waitLock)) { throw new InterruptedException (); } 297 } 298 299 private Action waitAction(ThreadID threadID) { 300 final Action action = new Action(); 301 if (greediness.isNotGreedy()) { 302 action.addAction(Action.REMOTE_LOCK_REQUEST); 303 } else if (canProceedWithRecall(threadID)) { 304 action.addAction(Action.RECALL_COMMIT); 305 } else if (greediness.isGreedy()) { 306 action.addAction(Action.AWARD_GREEDY_LOCKS); 307 } 308 if (isLockSynchronouslyHeld(threadID)) { 309 action.addAction(Action.SYNCHRONOUS_COMMIT); 310 } 311 return action; 312 } 313 314 public synchronized Notify notify(ThreadID threadID, boolean all) { 315 boolean isRemote; 316 waitUntillRunning(); 317 checkValidWaitNotifyState(threadID); 318 if (!greediness.isNotGreedy()) { 319 isRemote = notifyLocalWaits(threadID, all); 320 } else { 321 isRemote = true; 322 } 323 return isRemote ? new Notify(lockID, threadID, all) : Notify.NULL; 324 } 325 326 private synchronized void handleInteruptIfWait(ThreadID threadID) { 327 LockRequest lockRequest = (LockRequest) pendingLockRequests.get(threadID); 328 if (!(lockRequest instanceof WaitLockRequest)) { return; } 329 movedToPending(threadID); 330 if (canAwardGreedilyNow(threadID, lockRequest.lockLevel())) { 331 awardLock(threadID, lockRequest.lockLevel()); 332 return; 333 } 334 if (greediness.isNotGreedy()) { 335 this.remoteLockManager.interrruptWait(lockID, threadID); 338 } 339 } 340 341 private void movedToPending(ThreadID threadID) { 343 LockHold holder = (LockHold) this.holders.get(threadID); 344 Assert.assertNotNull(holder); 345 int server_level = holder.goToPending(); 346 LockRequest pending = new LockRequest(lockID, threadID, server_level); 347 LockRequest waiter = (LockRequest) this.pendingLockRequests.remove(threadID); 348 if (waiter == null) { 349 logger.warn("Pending request " + pending + " is not present: " + waiter); 350 return; 351 } 352 if (waiter instanceof WaitLockRequest) { 353 cancelTimer((WaitLockRequest) waiter); 354 } else { 355 logger.warn("Pending request " + pending + " is not a waiter: " + waiter); 356 } 357 this.pendingLockRequests.put(threadID, pending); 358 } 359 360 363 public synchronized void notified(ThreadID threadID) { 364 movedToPending(threadID); 365 } 366 367 371 public synchronized void recall(int interestedLevel, LockFlushCallback callback) { 372 if (greediness.isGreedy()) { 374 greediness.recall(interestedLevel); 375 if (canProceedWithRecall()) { 376 greediness.startRecallCommit(); 377 if (isTransactionsForLockFlushed(callback)) { 378 recallCommit(); 380 } 381 } 382 } 383 } 384 385 388 public void cannotAwardLock(ThreadID threadID, int level) { 389 final Object waitLock; 390 synchronized (this) { 391 waitLock = waitLocksByRequesterID.remove(threadID); 392 if (waitLock == null && !threadID.equals(ThreadID.VM_ID)) { 393 throw new LockNotPendingError("Attempt to reject a lock request that isn't pending: lockID: " + lockID 395 + ", level: " + level + ", requesterID: " + threadID); 396 } 397 LockRequest lockRequest = (LockRequest) pendingLockRequests.remove(threadID); 398 if (lockRequest == null) { 399 throw new AssertionError ("Attempt to remove a pending lock request that wasn't pending; lockID: " + lockID 401 + ", level: " + level + ", requesterID: " + threadID); 402 } 403 } 404 synchronized (waitLock) { 405 reject(threadID); 406 waitLock.notifyAll(); 407 } 408 } 409 410 private void reject(ThreadID threadID) { 411 synchronized (rejectedLockRequesterIDs) { 412 rejectedLockRequesterIDs.add(threadID); 413 } 414 } 415 416 419 public void awardLock(ThreadID threadID, int level) { 420 final Object waitLock; 421 synchronized (this) { 422 waitLock = waitLocksByRequesterID.remove(threadID); 424 if (waitLock == null && !threadID.equals(ThreadID.VM_ID)) { 425 throw new LockNotPendingError("Attempt to award a lock that isn't pending: lockID: " + lockID + ", level: " 427 + level + ", requesterID: " + threadID); 428 } 429 if (LockLevel.isGreedy(level)) { 430 Assert.assertEquals(threadID, ThreadID.VM_ID); 431 final int nlevel = LockLevel.makeNotGreedy(level); 435 greediness.add(nlevel); 436 awardLocksGreedily(); 437 return; 438 } 439 440 LockRequest lockRequest = (LockRequest) pendingLockRequests.remove(threadID); 442 if (lockRequest == null) { 443 throw new AssertionError ("Attempt to remove a pending lock request that wasn't pending; lockID: " + lockID 445 + ", level: " + level + ", requesterID: " + threadID); 446 } 447 } 448 449 synchronized (waitLock) { 450 award(threadID, level); 451 waitLock.notifyAll(); 452 } 453 } 454 455 458 private synchronized Object addToPendingLockRequest(ThreadID threadID, int lockLevel, boolean noBlock) { 459 LockRequest lockRequest = new LockRequest(lockID, threadID, lockLevel, noBlock); 461 Object old = pendingLockRequests.put(threadID, lockRequest); 462 if (old != null) { 463 throw new AssertionError ("Lock request already outstandind - " + old); 465 } 466 467 Object o = new Object (); 469 Object prev = waitLocksByRequesterID.put(threadID, o); 470 if (prev != null) { throw new AssertionError ("Assert Failed : Previous value is not null. Prev = " + prev 471 + " Thread id = " + threadID); } 472 return o; 473 } 474 475 private boolean waitForTryLock(ThreadID threadID, Object waitLock) { 476 boolean isInterrupted = false; 478 synchronized (waitLock) { 479 while (!isLockRequestResponded(threadID)) { 481 try { 482 waitLock.wait(); 483 } catch (InterruptedException ioe) { 484 isInterrupted = true; 485 } 486 } 487 } 488 return isInterrupted; 489 } 490 491 private boolean isLockRequestResponded(ThreadID threadID) { 492 if (isHeldBy(threadID)) { return true; } 493 synchronized (rejectedLockRequesterIDs) { 494 return rejectedLockRequesterIDs.remove(threadID); 495 } 496 } 497 498 private boolean waitForLock(ThreadID threadID, int type, Object waitLock) { 499 boolean isInterrupted = false; 501 while (!isHeldBy(threadID, type)) { 502 try { 503 synchronized (waitLock) { 504 if (!isHeldBy(threadID, type)) { 505 waitLock.wait(); 506 } 507 } 508 } catch (InterruptedException ioe) { 509 if (!isInterrupted) { 510 isInterrupted = true; 511 handleInteruptIfWait(threadID); 512 } 513 } catch (Throwable e) { 514 throw new TCRuntimeException(e); 515 } 516 } 517 return isInterrupted; 518 } 520 521 private synchronized void scheduleWaitTimeout(WaitLockRequest waitLockRequest) { 522 final TimerTask timer = waitTimer.scheduleTimer(this, waitLockRequest.getWaitInvocation(), waitLockRequest); 523 if (timer != null) { 524 waitTimers.put(waitLockRequest, timer); 525 } 526 } 527 528 private synchronized void awardLocksGreedily() { 529 List copy = new ArrayList (pendingLockRequests.values()); 531 for (Iterator i = copy.iterator(); i.hasNext();) { 532 Object o = i.next(); 533 if (o instanceof WaitLockRequest) continue; 534 LockRequest lr = (LockRequest) o; 535 if (canAwardGreedilyNow(lr.threadID(), lr.lockLevel())) { 536 awardLock(lr.threadID(), lr.lockLevel()); 537 } 538 } 539 } 540 541 545 private synchronized boolean isRemoteUnlockRequired(ThreadID threadID) { 546 LockHold holder = (LockHold) this.holders.get(threadID); 548 Assert.assertNotNull(holder); 549 550 if (LockLevel.isConcurrent(holder.getLevel())) { return false; } 551 552 return holder.isRemoteUnlockRequired(); 553 } 554 555 558 private synchronized boolean removeCurrent(ThreadID threadID) { 559 LockHold holder = (LockHold) this.holders.get(threadID); 561 Assert.assertNotNull(holder); 562 563 if (LockLevel.isConcurrent(holder.getLevel())) { 564 holder.removeCurrent(); 565 566 if (holder.getLevel() == LockLevel.NIL_LOCK_LEVEL) { 567 this.holders.remove(threadID); 568 } 569 570 return false; 571 } 572 573 boolean rv = holder.removeCurrent(); 574 575 if (holder.getLevel() == LockLevel.NIL_LOCK_LEVEL) { 576 this.holders.remove(threadID); 577 } 578 579 return rv; 580 } 581 582 private void checkValidWaitNotifyState(ThreadID threadID) { 583 if (!isHeldBy(threadID, LockLevel.WRITE)) { 584 throw new IllegalMonitorStateException ("The current Thread (" + threadID + ") does not hold a WRITE lock for " 586 + lockID); 587 } 588 } 589 590 594 private synchronized boolean notifyLocalWaits(ThreadID threadID, boolean all) { 595 for (Iterator i = new HashSet (pendingLockRequests.values()).iterator(); i.hasNext();) { 596 Object o = i.next(); 597 if (o instanceof WaitLockRequest) { 598 WaitLockRequest wlr = (WaitLockRequest) o; 599 notified(wlr.threadID()); 600 if (!all) { return false; } 601 } 602 } 603 return true; 604 } 605 606 private boolean isTransactionsForLockFlushed(LockFlushCallback callback) { 607 return remoteLockManager.isTransactionsForLockFlushed(lockID, callback); 608 } 609 610 public void transactionsForLockFlushed(LockID id) { 611 Assert.assertEquals(lockID, id); 612 recallCommit(); 613 } 614 615 private synchronized void recallCommit() { 616 if (greediness.isRecallInProgress()) { 618 greediness.recallComplete(); 619 cancelTimers(); 620 remoteLockManager.recallCommit(lockID, addHoldersToAsLockRequests(new ArrayList ()), 621 addAllWaitersTo(new ArrayList ()), addAllPendingLockRequestsTo(new ArrayList ())); 622 } else { 623 logger.debug(lockID + " : recallCommit() : skipping as the state is not RECALL_IN_PROGRESS !"); 624 } 625 } 626 627 private void flush() { 628 remoteLockManager.flush(lockID); 629 } 630 631 public synchronized Collection addAllWaitersTo(Collection c) { 632 if (greediness.isNotGreedy()) { 633 for (Iterator i = pendingLockRequests.values().iterator(); i.hasNext();) { 634 Object o = i.next(); 635 if (o instanceof WaitLockRequest) { 636 c.add(o); 637 } 638 } 639 } 640 return c; 641 } 642 643 public synchronized Collection addHoldersToAsLockRequests(Collection c) { 644 if (greediness.isNotGreedy()) { 645 for (Iterator i = holders.keySet().iterator(); i.hasNext();) { 646 ThreadID threadID = (ThreadID) i.next(); 647 LockHold hold = (LockHold) holders.get(threadID); 648 if (hold.isHolding() && hold.getServerLevel() != LockLevel.NIL_LOCK_LEVEL) { 649 c.add(new LockRequest(this.lockID, threadID, hold.getServerLevel())); 650 } 651 } 652 } else { 653 c.add(new LockRequest(this.lockID, ThreadID.VM_ID, greediness.getLevel())); 655 } 656 return c; 657 } 658 659 public synchronized Collection addAllPendingLockRequestsTo(Collection c) { 660 if (greediness.isNotGreedy()) { 661 for (Iterator i = pendingLockRequests.values().iterator(); i.hasNext();) { 662 LockRequest request = (LockRequest) i.next(); 663 if (request instanceof WaitLockRequest) continue; 664 c.add(request); 665 } 666 } 667 return c; 668 } 669 670 public synchronized void incUseCount() { 671 if (useCount == Integer.MAX_VALUE) { throw new AssertionError ("Lock use count cannot exceed integer max value"); } 672 useCount++; 673 timeUsed = System.currentTimeMillis(); 674 } 675 676 public synchronized void decUseCount() { 677 if (useCount == 0) { throw new AssertionError ("Lock use count is zero"); } 678 useCount--; 679 timeUsed = System.currentTimeMillis(); 680 } 681 682 public synchronized int getUseCount() { 683 return useCount; 684 } 685 686 private synchronized void cancelTimer(WaitLockRequest request) { 687 TimerTask timer = (TimerTask ) waitTimers.remove(request); 688 if (timer != null) { 689 timer.cancel(); 690 } 691 } 692 693 private synchronized void cancelTimers() { 694 Collection copy = new ArrayList (waitTimers.keySet()); 695 for (Iterator iter = copy.iterator(); iter.hasNext();) { 696 WaitLockRequest wlr = (WaitLockRequest) iter.next(); 697 cancelTimer(wlr); 698 } 699 } 700 701 704 private synchronized boolean canProceedWithRecall() { 705 return canProceedWithRecall(ThreadID.NULL_ID); 706 } 707 708 private synchronized boolean canProceedWithRecall(ThreadID threadID) { 709 if (greediness.isRecalled()) { 710 Map map = addRecalledHoldersTo(new HashMap()); 711 if (threadID != ThreadID.NULL_ID) { 712 map.remove(threadID); 713 } 714 for (Iterator i = pendingLockRequests.values().iterator(); i.hasNext() && map.size() != 0;) { 715 Object o = i.next(); 716 if (o instanceof WaitLockRequest) { 717 continue; 719 } 720 if (o instanceof LockRequest) { 721 LockRequest lr = (LockRequest) o; 722 map.remove(lr.threadID()); 723 } 724 } 725 return (map.size() == 0); 726 } 727 return false; 728 } 729 730 private synchronized void award(ThreadID threadID, int level) { 731 LockHold holder = (LockHold) this.holders.get(threadID); 733 if (holder == null) { 734 holders.put(threadID, new LockHold(this.lockID, level)); 735 } else if (holder.isHolding()) { 736 holder.add(level); 737 } else { 738 try { 740 holder.goToHolding(level); 741 } catch (TCAssertionError er) { 742 logger.warn("Lock in wrong STATE for holder - (" + threadID + ", " + LockLevel.toString(level) + ") - " + this); 743 throw er; 744 } 745 } 746 } 747 748 private synchronized void awardSynchronous(ThreadID threadID, int lockLevel) { 749 LockHold holder = (LockHold) this.holders.get(threadID); 750 if (holder != null && holder.isHolding() && ((holder.getLevel() & lockLevel) == lockLevel)) { 751 holder.makeLastAwardSynchronous(lockLevel); 752 } 753 } 754 755 758 private synchronized boolean canAwardGreedilyNow(ThreadID threadID, int level) { 759 if (greediness.isGreedy()) { 760 if (LockLevel.isWrite(level) && greediness.isWrite() && (!isHeld() || ((heldCount() == 1) && isHeldBy(threadID)))) { 767 return true; 769 } else if (LockLevel.isRead(level) && greediness.isWrite() && !isWriteHeld()) { 770 return true; 772 } else if (LockLevel.isRead(level) && greediness.isRead()) { 773 return true; 775 } 776 } 777 return false; 778 } 779 780 private boolean isLockSynchronouslyHeld(ThreadID threadID) { 781 LockHold holder = (LockHold) this.holders.get(threadID); 782 if (holder != null && holder.isHolding()) { return holder.isLastLockSynchronouslyHeld(); } 783 return false; 784 } 785 786 789 private synchronized boolean isGreedyRecallNeeded(ThreadID threadID, int level) { 790 if (greediness.isGreedy()) { 791 if (LockLevel.isWrite(level) && greediness.isReadOnly()) { return true; } 793 } 794 return false; 795 } 796 797 private boolean isWriteHeld() { 798 synchronized (holders) { 799 for (Iterator it = holders.values().iterator(); it.hasNext();) { 800 LockHold holder = (LockHold) it.next(); 801 if (holder.isHolding() && LockLevel.isWrite(holder.getLevel())) { return true; } 802 } 803 return false; 804 } 805 } 806 807 813 private synchronized Map addRecalledHoldersTo(Map map) { 814 Assert.assertTrue(greediness.isRecalled()); 815 for (Iterator it = holders.entrySet().iterator(); it.hasNext();) { 816 Entry e = (Entry) it.next(); 817 ThreadID id = (ThreadID) e.getKey(); 818 LockHold holder = (LockHold) e.getValue(); 819 if (!holder.isHolding()) continue; 820 if ((greediness.getRecalledLevel() == LockLevel.READ) && LockLevel.isRead(holder.getLevel())) { 821 continue; 823 } 824 map.put(id, id); 825 } 826 return map; 827 } 828 829 public synchronized void waitTimeout(Object callbackObject) { 830 waitUntillRunning(); 831 if (callbackObject instanceof WaitLockRequest) { 833 WaitLockRequest wlr = (WaitLockRequest) callbackObject; 834 LockID timedoutLockID = wlr.lockID(); 835 if (!lockID.equals(timedoutLockID)) { throw new AssertionError ("WaitTimeout: LockIDs are not the same : " 836 + lockID + " : " + timedoutLockID); } 837 if (greediness.isWrite() && isWaiting(wlr.threadID())) { 838 notified(wlr.threadID()); 839 awardLocksGreedily(); 840 return; 841 } 842 } 843 logger.warn("Ignoring wait timeout for : " + callbackObject); 844 } 845 846 public synchronized boolean isClear() { 847 return (holders.isEmpty() && greediness.isNotGreedy() && (pendingLockRequests.size() == 0) && (useCount == 0)); 848 } 849 850 public boolean timedout() { 853 if (useCount != 0) { return false; } 854 synchronized (this) { 855 return (holders.isEmpty() && greediness.isGreedy() && (pendingLockRequests.size() == 0) && (useCount == 0) && ((System 856 .currentTimeMillis() - timeUsed) > ClientLockManagerImpl.TIMEOUT)); 857 } 858 } 859 860 private boolean isHeldBy(ThreadID threadID) { 861 synchronized (holders) { 862 LockHold holder = (LockHold) holders.get(threadID); 863 if (holder != null) { return holder.isHolding(); } 864 return false; 865 } 866 } 867 868 public boolean isHeldBy(ThreadID threadID, int level) { 869 synchronized (holders) { 870 LockHold holder = (LockHold) holders.get(threadID); 871 if (holder != null) { return ((holder.isHolding()) && ((holder.getLevel() & level) == level)); } 872 return false; 873 } 874 } 875 876 public boolean isHeld() { 877 synchronized (holders) { 878 for (Iterator it = holders.values().iterator(); it.hasNext();) { 879 LockHold holder = (LockHold) it.next(); 880 if (holder.isHolding()) { return true; } 881 } 882 return false; 883 } 884 } 885 886 private boolean isWaiting(ThreadID threadID) { 887 synchronized (holders) { 888 LockHold holder = (LockHold) holders.get(threadID); 889 if (holder != null) { return holder.isWaiting(); } 890 return false; 891 } 892 } 893 894 private int heldCount() { 895 int count = 0; 896 synchronized (holders) { 897 for (Iterator it = holders.values().iterator(); it.hasNext();) { 898 LockHold holder = (LockHold) it.next(); 899 if (holder.isHolding()) { 900 count++; 901 } 902 } 903 } 904 return count; 905 } 906 907 public int localHeldCount(ThreadID threadID, int lockLevel) { 908 LockHold holder; 909 synchronized (holders) { 910 holder = (LockHold) holders.get(threadID); 911 } 912 if (holder == null) return 0; 913 else return holder.heldCount(lockLevel); 914 } 915 916 public synchronized int queueLength() { 917 int count = 0; 918 for (Iterator i = pendingLockRequests.values().iterator(); i.hasNext();) { 919 Object o = i.next(); 920 if (!(o instanceof WaitLockRequest)) count++; 921 } 922 return count; 923 } 924 925 public synchronized int waitLength() { 926 int localCount = 0; 927 for (Iterator i = pendingLockRequests.values().iterator(); i.hasNext();) { 928 Object o = i.next(); 929 if (o instanceof WaitLockRequest) { 930 localCount++; 931 } 932 } 933 return localCount; 934 } 935 936 private boolean localWaiterExist(ThreadID waiterThreadID, int waiterLockLevel) { 937 Object request = pendingLockRequests.get(waiterThreadID); 938 if (request instanceof WaitLockRequest) { 939 WaitLockRequest waitLockRequest = (WaitLockRequest) request; 940 return waitLockRequest.lockID().equals(lockID) && waitLockRequest.lockLevel() == waiterLockLevel; 941 } 942 return false; 943 } 944 945 public LockID getLockID() { 946 return lockID; 947 } 948 949 public int hashCode() { 950 return this.lockID.hashCode(); 951 } 952 953 public boolean equals(Object obj) { 954 if (obj instanceof ClientLock) { 955 ClientLock lock = (ClientLock) obj; 956 return lock.lockID.equals(lockID); 957 } 958 return false; 959 } 960 961 public String toString() { 962 return "Lock@" + System.identityHashCode(this) + " [ " + lockID + " ] : Holders = " + holders 963 + " : PendingLockRequest : " + pendingLockRequests + " : Use count : " + useCount + " : state : " + state 964 + " : " + greediness; 965 } 966 967 private boolean isConcurrentWriteLock(ThreadID threadID) { 968 LockHold holder = (LockHold) holders.get(threadID); 969 if (holder != null) { return LockLevel.isConcurrent(holder.getLevel()); } 970 return false; 971 } 972 973 public void pause() { 974 state = PAUSED; 975 } 976 977 public synchronized void unpause() { 978 state = RUNNING; 979 notifyAll(); 980 } 981 982 private synchronized void waitUntillRunning() { 983 boolean isInterrupted = false; 984 while (state != RUNNING) { 985 try { 986 wait(); 987 } catch (InterruptedException e) { 988 isInterrupted = true; 989 } 990 } 991 Util.selfInterruptIfNeeded(isInterrupted); 992 } 993 994 1012 private static class LockHold { 1013 private static final State HOLDING = new State("HOLDING"); 1014 private static final State WAITING = new State("WAITING"); 1015 private static final State PENDING = new State("PENDING"); 1016 1017 private int level; 1018 private int server_level; 1019 private State state; 1020 private final TIntIntHashMap counts = new TIntIntHashMap(); 1021 private final TIntStack levels = new TIntStack(); 1022 private final LockID lockID; 1023 1024 LockHold(LockID lockID, int level) { 1025 this.lockID = lockID; 1026 Assert.eval("Non-discreet level " + level, LockLevel.isDiscrete(level)); 1027 Assert.eval(level != LockLevel.NIL_LOCK_LEVEL); 1028 this.level = level; 1029 this.levels.push(level); 1030 this.counts.put(level, 1); 1031 initServerLevel(); 1032 this.state = HOLDING; 1033 } 1034 1035 private void initServerLevel() { 1036 if (level == LockLevel.READ || level == LockLevel.WRITE) { 1037 server_level = level; 1038 } else { 1039 server_level = LockLevel.NIL_LOCK_LEVEL; 1040 } 1041 } 1042 1043 int getServerLevel() { 1044 return this.server_level; 1045 } 1046 1047 int getLevel() { 1048 return this.level; 1049 } 1050 1051 boolean isHolding() { 1052 return (state == HOLDING); 1053 } 1054 1055 boolean isWaiting() { 1056 return (state == WAITING); 1057 } 1058 1059 boolean isPending() { 1060 return (state == PENDING); 1061 } 1062 1063 int heldCount() { 1064 return levels.size(); 1065 } 1066 1067 int heldCount(int lockLevel) { 1068 return this.counts.get(lockLevel); 1069 } 1070 1071 void makeLastAwardSynchronous(int lockLevel) { 1072 int lastLevel = this.levels.pop(); 1073 Assert.assertEquals(lockLevel, lastLevel); 1074 this.levels.push(LockLevel.makeSynchronous(lockLevel)); 1075 } 1076 1077 boolean isLastLockSynchronouslyHeld() { 1078 int lastLevel = this.levels.peek(); 1079 return LockLevel.isSynchronous(lastLevel); 1080 } 1081 1082 void add(int lockLevel) { 1083 Assert.eval("Non-discreet level " + lockLevel, LockLevel.isDiscrete(lockLevel)); 1084 1085 this.levels.push(lockLevel); 1086 this.level |= lockLevel; 1087 Assert.eval(level != LockLevel.NIL_LOCK_LEVEL); 1088 if ((lockLevel == LockLevel.READ && (!LockLevel.isWrite(server_level))) || (lockLevel == LockLevel.WRITE)) { 1089 server_level |= lockLevel; 1090 } 1091 if (!this.counts.increment(lockLevel)) { 1092 this.counts.put(lockLevel, 1); 1093 } 1094 } 1095 1096 1099 boolean isRemoteUnlockRequired() { 1100 Assert.eval(this.levels.size() > 0); 1101 int lastLevel = LockLevel.makeNotSynchronous(levels.peek()); 1102 1103 Assert.eval(this.counts.contains(lastLevel)); 1104 int count = this.counts.get(lastLevel); 1105 Assert.eval(count > 0); 1106 1107 count--; 1108 if (count > 0) { return false; } 1109 1110 return lastLevel == LockLevel.WRITE || ((this.level ^ lastLevel) == LockLevel.NIL_LOCK_LEVEL); 1111 } 1112 1113 1116 boolean removeCurrent() { 1117 Assert.eval(this.levels.size() > 0); 1118 int lastLevel = LockLevel.makeNotSynchronous(levels.pop()); 1119 1120 Assert.eval(this.counts.contains(lastLevel)); 1121 int count = this.counts.remove(lastLevel); 1122 Assert.eval(count > 0); 1123 1124 count--; 1125 if (count > 0) { 1126 this.counts.put(lastLevel, count); 1127 return false; 1128 } 1129 1130 this.level ^= lastLevel; 1131 1132 if ((lastLevel == LockLevel.READ && (!LockLevel.isWrite(server_level))) || (lastLevel == LockLevel.WRITE)) { 1133 server_level ^= lastLevel; 1134 } 1135 return lastLevel == LockLevel.WRITE || this.level == LockLevel.NIL_LOCK_LEVEL; 1136 } 1137 1138 int goToWaitState() { 1139 Assert.assertTrue(LockLevel.isWrite(this.level)); 1140 Assert.assertTrue(state == HOLDING); 1141 this.state = WAITING; 1142 1148 return this.server_level; 1149 } 1150 1151 int goToPending() { 1152 1158 if (this.state != WAITING) { 1159 logger.warn(this.lockID + ": Ignoring Moving to PENDING since not in WAITING state: current state = " 1160 + this.state); 1161 } else { 1162 this.state = PENDING; 1163 } 1164 return this.server_level; 1165 } 1166 1167 void goToHolding(int slevel) { 1168 Assert.assertTrue(slevel == server_level); 1169 if (state != PENDING) throw new AssertionError ("Attempt to to to HOLDING while not PENDING: " + state); 1170 this.state = HOLDING; 1171 } 1172 1173 public String toString() { 1174 return "LockHold[" + state + "," + LockLevel.toString(level) + "]"; 1175 } 1176 } 1177 1178 private static class Greediness { 1179 private static final State NOT_GREEDY = new State("NOT GREEDY"); 1180 private static final State GREEDY = new State("GREEDY"); 1181 private static final State RECALLED = new State("RECALLED"); 1182 private static final State RECALL_IN_PROGRESS = new State("RECALL IN PROGRESS"); 1183 1184 private int level = LockLevel.NIL_LOCK_LEVEL; 1185 private int recallLevel = LockLevel.NIL_LOCK_LEVEL; 1186 private State state = NOT_GREEDY; 1187 1188 void add(int l) { 1189 this.level |= l; 1190 state = GREEDY; 1191 } 1192 1193 int getLevel() { 1194 return level; 1195 } 1196 1197 int getRecalledLevel() { 1198 return recallLevel; 1199 } 1200 1201 void recall(int rlevel) { 1202 Assert.assertTrue(state == GREEDY); 1203 this.recallLevel |= rlevel; 1204 state = RECALLED; 1205 } 1206 1207 boolean isRead() { 1208 return LockLevel.isRead(level); 1209 } 1210 1211 boolean isReadOnly() { 1212 return isRead() && !isWrite(); 1213 } 1214 1215 boolean isWrite() { 1216 return LockLevel.isWrite(level); 1217 } 1218 1219 boolean isUpgrade() { 1220 return isRead() && isWrite(); 1221 } 1222 1223 boolean isGreedy() { 1224 return (state == GREEDY); 1225 } 1226 1227 boolean isNotGreedy() { 1229 return (state == NOT_GREEDY); 1230 } 1231 1232 public String toString() { 1233 return "Greedy Token [ Lock Level = " + LockLevel.toString(level) + ", Recall Level = " 1234 + LockLevel.toString(recallLevel) + ", " + state + "]"; 1235 } 1236 1237 boolean isRecalled() { 1238 return (state == RECALLED); 1239 } 1240 1241 boolean isRecallInProgress() { 1242 return (state == RECALL_IN_PROGRESS); 1243 } 1244 1245 void startRecallCommit() { 1246 Assert.assertTrue(state == RECALLED); 1247 state = RECALL_IN_PROGRESS; 1248 } 1249 1250 void recallComplete() { 1251 Assert.assertTrue(state == RECALL_IN_PROGRESS); 1252 this.state = NOT_GREEDY; 1253 this.recallLevel = LockLevel.NIL_LOCK_LEVEL; 1254 this.level = LockLevel.NIL_LOCK_LEVEL; 1255 } 1256 } 1257 1258 private static class Action { 1259 private static final int NIL_ACTION = 0x00; 1260 private static final int REMOTE_LOCK_REQUEST = 0x01; 1261 private static final int RECALL = 0x02; 1262 private static final int RECALL_COMMIT = 0x04; 1263 private static final int AWARD_GREEDY_LOCKS = 0x08; 1264 private static final int SYNCHRONOUS_COMMIT = 0x10; 1265 1266 private int action = NIL_ACTION; 1267 1268 void addAction(int a) { 1269 action |= a; 1270 } 1271 1272 boolean doRemoteLockRequest() { 1273 return ((action & REMOTE_LOCK_REQUEST) == REMOTE_LOCK_REQUEST); 1274 } 1275 1276 boolean doRecall() { 1277 return ((action & RECALL) == RECALL); 1278 } 1279 1280 boolean doRecallCommit() { 1281 return ((action & RECALL_COMMIT) == RECALL_COMMIT); 1282 } 1283 1284 boolean doAwardGreedyLocks() { 1285 return ((action & AWARD_GREEDY_LOCKS) == AWARD_GREEDY_LOCKS); 1286 } 1287 1288 boolean doSynchronousCommit() { 1289 return ((action & SYNCHRONOUS_COMMIT) == SYNCHRONOUS_COMMIT); 1290 } 1291 1292 public boolean equals(Object o) { 1293 if (!(o instanceof Action)) return false; 1294 return (((Action) o).action == action); 1295 } 1296 1297 public int hashCode() { 1298 return action; 1299 } 1300 1301 public String toString() { 1302 return "Action:[" + getDescription() + "]"; 1303 } 1304 1305 public String getDescription() { 1306 if (action == NIL_ACTION) { return "NIL_ACTION"; } 1307 StringBuffer sb = new StringBuffer (" "); 1308 if (doAwardGreedyLocks()) sb.append("AWARD_GREEDY_LOCKS,"); 1309 if (doRecall()) sb.append("RECALL,"); 1310 if (doRecallCommit()) sb.append("RECALL_COMMIT,"); 1311 if (doRemoteLockRequest()) sb.append("REMOTE_LOCK_REQUEST,"); 1312 if (doSynchronousCommit()) sb.append("SYNCHRONOUS_COMMIT,"); 1313 sb.setLength(sb.length() - 1); 1314 return sb.toString(); 1315 } 1316 } 1317} 1318 | Popular Tags |