1 5 package com.tc.objectserver.lockmanager.impl; 6 7 import org.apache.commons.collections.map.ListOrderedMap; 8 9 import com.tc.async.api.Sink; 10 import com.tc.exception.TCInternalError; 11 import com.tc.logging.TCLogger; 12 import com.tc.logging.TCLogging; 13 import com.tc.net.protocol.tcm.ChannelID; 14 import com.tc.object.lockmanager.api.LockContext; 15 import com.tc.object.lockmanager.api.LockID; 16 import com.tc.object.lockmanager.api.LockLevel; 17 import com.tc.object.lockmanager.api.ServerThreadID; 18 import com.tc.object.lockmanager.api.ThreadID; 19 import com.tc.object.lockmanager.api.WaitTimer; 20 import com.tc.object.lockmanager.api.WaitTimerCallback; 21 import com.tc.object.net.DSOChannelManager; 22 import com.tc.object.tx.WaitInvocation; 23 import com.tc.objectserver.context.LockResponseContext; 24 import com.tc.objectserver.lockmanager.api.LockEventListener; 25 import com.tc.objectserver.lockmanager.api.LockHolder; 26 import com.tc.objectserver.lockmanager.api.LockMBean; 27 import com.tc.objectserver.lockmanager.api.LockWaitContext; 28 import com.tc.objectserver.lockmanager.api.NotifiedWaiters; 29 import com.tc.objectserver.lockmanager.api.ServerLockRequest; 30 import com.tc.objectserver.lockmanager.api.TCIllegalMonitorStateException; 31 import com.tc.objectserver.lockmanager.api.Waiter; 32 import com.tc.util.Assert; 33 34 import java.util.ArrayList ; 35 import java.util.Collection ; 36 import java.util.Collections ; 37 import java.util.HashMap ; 38 import java.util.Iterator ; 39 import java.util.LinkedList ; 40 import java.util.List ; 41 import java.util.Map ; 42 import java.util.TimerTask ; 43 44 public class Lock { 45 private static final TCLogger logger = TCLogging.getLogger(Lock.class); 46 public final static Lock NULL_LOCK = new Lock(LockID.NULL_ID, 0, 47 new LockEventListener[] {}, true, 48 LockManagerImpl.ALTRUISTIC_LOCK_POLICY, 49 ServerThreadContextFactory.DEFAULT_FACTORY); 50 51 private static final int UPGRADE = LockLevel.READ | LockLevel.WRITE; 52 53 private final LockEventListener[] listeners; 54 private final Map greedyHolders = new HashMap (); 55 private final Map holders = new HashMap (); 56 private final List pendingLockRequests = new LinkedList (); 57 private final List pendingLockUpgrades = new LinkedList (); 58 private final ListOrderedMap waiters = new ListOrderedMap(); 59 private final Map waitTimers = new HashMap (); 60 private final LockID lockID; 61 private final long timeout; 62 private final boolean isNull; 63 private int level; 64 private boolean recalled = false; 65 66 private int lockPolicy; 67 private final ServerThreadContextFactory threadContextFactory; 68 69 Lock(LockID lockID, ServerThreadContext txn, int lockLevel, Sink lockResponseSink, long timeout, 71 LockEventListener[] listeners, int lockPolicy, ServerThreadContextFactory threadContextFactory) { 72 this(lockID, timeout, listeners, false, lockPolicy, threadContextFactory); 73 requestLock(txn, lockLevel, lockResponseSink); 74 } 75 76 Lock(LockID lockID, ServerThreadContext txn, long timeout, LockEventListener[] listeners, int lockPolicy, 79 ServerThreadContextFactory threadContextFactory) { 80 this(lockID, timeout, listeners, false, lockPolicy, threadContextFactory); 81 } 82 83 Lock(LockID lockID, long timeout, LockEventListener[] listeners) { 85 this(lockID, timeout, listeners, false, LockManagerImpl.ALTRUISTIC_LOCK_POLICY, 86 ServerThreadContextFactory.DEFAULT_FACTORY); 87 } 88 89 private Lock(LockID lockID, long timeout, LockEventListener[] listeners, boolean isNull, int lockPolicy, 90 ServerThreadContextFactory threadContextFactory) { 91 this.lockID = lockID; 92 this.listeners = listeners; 93 this.timeout = timeout; 94 this.isNull = isNull; 95 this.lockPolicy = lockPolicy; 96 this.threadContextFactory = threadContextFactory; 97 } 98 99 static LockResponseContext createLockRejectedResponseContext(LockID lockID, ServerThreadID threadID, int level) { 100 return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level, 101 LockResponseContext.LOCK_NOT_AWARDED); 102 } 103 104 static LockResponseContext createLockAwardResponseContext(LockID lockID, ServerThreadID threadID, int level) { 105 return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level, 106 LockResponseContext.LOCK_AWARD); 107 } 108 109 static LockResponseContext createLockRecallResponseContext(LockID lockID, ServerThreadID threadID, int level) { 110 return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level, 111 LockResponseContext.LOCK_RECALL); 112 } 113 114 static LockResponseContext createLockWaitTimeoutResponseContext(LockID lockID, ServerThreadID threadID, int level) { 115 return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level, 116 LockResponseContext.LOCK_WAIT_TIMEOUT); 117 } 118 119 static LockResponseContext createLockQueriedResponseContext(LockID lockID, ServerThreadID threadID, int level, 120 int lockRequestQueueLength, int lockUpgradeQueueLength, 121 Collection greedyHolders, Collection holders, 122 Collection waiters) { 123 return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level, 124 lockRequestQueueLength, lockUpgradeQueueLength, greedyHolders, holders, waiters, 125 LockResponseContext.LOCK_INFO); 126 } 127 128 synchronized LockMBean getMBean(DSOChannelManager channelManager) { 129 int count; 130 LockHolder[] holds = new LockHolder[this.holders.size()]; 131 ServerLockRequest[] reqs = new ServerLockRequest[this.pendingLockRequests.size()]; 132 ServerLockRequest[] upgrades = new ServerLockRequest[this.pendingLockUpgrades.size()]; 133 Waiter[] waits = new Waiter[this.waiters.size()]; 134 135 count = 0; 136 for (Iterator i = this.holders.values().iterator(); i.hasNext();) { 137 Holder h = (Holder) i.next(); 138 ChannelID cid = h.getChannelID(); 139 holds[count++] = new LockHolder(cid, channelManager.getChannelAddress(cid), h.getThreadID(), h.getLockLevel(), h 140 .getTimestamp()); 141 } 142 143 count = 0; 144 for (Iterator i = this.pendingLockRequests.iterator(); i.hasNext();) { 145 Request r = (Request) i.next(); 146 ChannelID cid = r.getRequesterID(); 147 reqs[count++] = new ServerLockRequest(cid, channelManager.getChannelAddress(cid), r.getSourceID(), r 148 .getLockLevel(), r.getTimestamp()); 149 } 150 151 count = 0; 152 for (Iterator i = this.pendingLockUpgrades.iterator(); i.hasNext();) { 153 Request r = (Request) i.next(); 154 ChannelID cid = r.getRequesterID(); 155 upgrades[count++] = new ServerLockRequest(cid, channelManager.getChannelAddress(cid), r.getSourceID(), r 156 .getLockLevel(), r.getTimestamp()); 157 } 158 159 count = 0; 160 for (Iterator i = this.waiters.values().iterator(); i.hasNext();) { 161 LockWaitContext wc = (LockWaitContext) i.next(); 162 ChannelID cid = wc.getChannelID(); 163 waits[count++] = new Waiter(cid, channelManager.getChannelAddress(cid), wc.getThreadID(), wc.getWaitInvocation(), 164 wc.getTimestamp()); 165 } 166 167 return new LockMBeanImpl(lockID, holds, reqs, upgrades, waits); 168 } 169 170 synchronized void queryLock(ServerThreadContext txn, Sink lockResponseSink) { 171 if (!hasGreedyHolders()) { 172 lockResponseSink.add(createLockQueriedResponseContext(this.lockID, txn.getId(), this.level, 173 this.pendingLockRequests.size(), this.pendingLockUpgrades 174 .size(), this.greedyHolders.values(), this.holders 175 .values(), this.waiters.values())); 176 } else { 177 lockResponseSink.add(createLockQueriedResponseContext(this.lockID, txn.getId(), this.level, 181 this.pendingLockRequests.size(), this.pendingLockUpgrades 182 .size(), this.greedyHolders.values(), this.holders 183 .values(), this.waiters.values())); 184 } 185 } 186 187 synchronized boolean tryRequestLock(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) { 188 return requestLock(txn, requestedLockLevel, lockResponseSink, true); 189 } 190 191 synchronized boolean requestLock(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) { 192 return requestLock(txn, requestedLockLevel, lockResponseSink, false); 193 } 194 195 private synchronized boolean requestLock(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink, 197 boolean noBlock) { 198 199 Holder holder = getHolder(txn); 203 if (noBlock && holder == null && (getHoldersCount() > 0 || hasGreedyHolders())) { 204 cannotAwardAndRespond(txn, requestedLockLevel, lockResponseSink); 205 return false; 206 } 207 208 if (holder != null) { 209 if (LockLevel.NIL_LOCK_LEVEL != (holder.getLockLevel() & requestedLockLevel)) { 210 throw new AssertionError ("Client requesting already held lock! holder=" + holder + ", lock=" + this); 212 } 213 } 214 if (waiters.containsKey(txn)) throw new AssertionError ("Attempt to request a lock in a Thread " 215 + "that is already part of the wait set. lock = " + this); 216 217 if (isPolicyGreedy()) { 218 if (canAwardGreedilyOnTheClient(txn, requestedLockLevel)) { 219 logger.debug(lockID + " : Lock.requestLock() : Ignoring the Lock request(" + txn + "," 222 + LockLevel.toString(requestedLockLevel) 223 + ") message from the a client that has the lock greedily."); 224 return false; 225 } else if (recalled) { 226 if (!holdsGreedyLock(txn)) { 229 addPending(txn, requestedLockLevel, lockResponseSink); 230 } 231 return false; 232 } 233 } 234 235 245 if ((getHoldersCount() == 0) || ((!hasPending()) && ((requestedLockLevel == LockLevel.READ) && this.isRead()))) { 246 if (isPolicyGreedy() && (requestedLockLevel != UPGRADE) 248 && ((requestedLockLevel == LockLevel.READ) || (getWaiterCount() == 0))) { 249 awardGreedyAndRespond(txn, requestedLockLevel, lockResponseSink); 250 } else { 251 awardAndRespond(txn, requestedLockLevel, lockResponseSink); 252 } 253 } else if ((getHoldersCount() == 1) && holdsReadLock(txn) && LockLevel.isWrite(requestedLockLevel)) { 254 if (isPolicyGreedy() && isGreedyRequest(txn)) { 256 requestedLockLevel = LockLevel.makeGreedy(requestedLockLevel); 258 } 259 awardAndRespond(txn, requestedLockLevel, lockResponseSink); 260 } else { 261 if (isPolicyGreedy() && hasGreedyHolders()) { 263 recall(requestedLockLevel); 264 } 265 if (!holdsGreedyLock(txn)) { 266 addPending(txn, requestedLockLevel, lockResponseSink); 267 } 268 return false; 269 } 270 271 return true; 272 } 273 274 synchronized void addRecalledHolder(ServerThreadContext txn, int lockLevel) { 275 if (!LockLevel.isWrite(level) && LockLevel.isWrite(lockLevel)) { 277 throw new AssertionError ("Client issued a WRITE lock without holding a GREEDY WRITE !"); 279 } 280 awardLock(txn, lockLevel); 281 if (LockLevel.isRead(lockLevel) && pendingLockRequests.size() > 0) { 282 for (Iterator iter = pendingLockRequests.iterator(); iter.hasNext();) { 284 Request request = (Request) iter.next(); 285 if (request.getThreadContext().equals(txn) && LockLevel.isWrite(request.getLockLevel())) { 286 iter.remove(); 287 pendingLockUpgrades.add(request); 288 break; 289 } 290 } 291 } 292 } 293 294 synchronized void addRecalledPendingRequest(ServerThreadContext txn, int lockLevel, Sink lockResponseSink) { 295 addPending(txn, lockLevel, lockResponseSink); 297 } 298 299 private synchronized void recall(int recallLevel) { 300 if (recalled) { return; } 301 for (Iterator i = greedyHolders.values().iterator(); i.hasNext();) { 302 Holder holder = (Holder) i.next(); 303 holder.getSink().add( 305 createLockRecallResponseContext(holder.getLockID(), holder.getThreadContext().getId(), 306 recallLevel)); 307 recalled = true; 308 } 309 } 310 311 private boolean isGreedyRequest(ServerThreadContext txn) { 312 return (txn.getId().getClientThreadID().equals(ThreadID.VM_ID)); 313 } 314 315 private boolean isPolicyGreedy() { 316 return lockPolicy == LockManagerImpl.GREEDY_LOCK_POLICY; 317 } 318 319 int getLockPolicy() { 320 return lockPolicy; 321 } 322 323 void setLockPolicy(int newPolicy) { 324 if (!isNull() && newPolicy != lockPolicy) { 325 this.lockPolicy = newPolicy; 326 if (!isPolicyGreedy()) { 327 recall(LockLevel.WRITE); 328 } 329 } 330 } 331 332 private void awardGreedyAndRespond(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) { 333 final ServerThreadContext clientTx = getClientVMContext(txn); 335 final int greedyLevel = LockLevel.makeGreedy(requestedLockLevel); 336 337 ChannelID ch = txn.getId().getChannelID(); 338 checkAndClearStateOnGreedyAward(ch, requestedLockLevel); 339 awardAndRespond(clientTx, greedyLevel, lockResponseSink); 340 Holder holder = getHolder(clientTx); 341 holder.setSink(lockResponseSink); 342 greedyHolders.put(ch, holder); 343 clearWaitingOn(txn); 344 } 345 346 private void cannotAwardAndRespond(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) { 347 lockResponseSink.add(createLockRejectedResponseContext(this.lockID, txn.getId(), requestedLockLevel)); 348 } 349 350 private void awardAndRespond(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) { 351 awardLock(txn, requestedLockLevel); 353 lockResponseSink.add(createLockAwardResponseContext(this.lockID, txn.getId(), requestedLockLevel)); 354 } 355 356 synchronized void notify(ServerThreadContext txn, boolean all, NotifiedWaiters addNotifiedWaitersTo) 357 throws TCIllegalMonitorStateException { 358 if (waiters.containsKey(txn)) { throw Assert.failure("Can't notify self: " + txn); } 360 checkLegalWaitNotifyState(txn); 361 362 if (waiters.size() > 0) { 363 final int numToNotify = all ? waiters.size() : 1; 364 for (int i = 0; i < numToNotify; i++) { 365 LockWaitContext wait = (LockWaitContext) waiters.remove(0); 366 removeAndCancelWaitTimer(wait); 367 createPendingFromWaiter(wait); 368 addNotifiedWaitersTo.addNotification(new LockContext(lockID, wait.getChannelID(), wait.getThreadID(), wait.lockLevel())); 369 } 370 } 371 } 372 373 synchronized void interrupt(ServerThreadContext txn) { 374 if (waiters.size() == 0 || !waiters.containsKey(txn)) { 375 logger.warn("Cannot interrupt: " + txn + " is not waiting."); 376 return; 377 } 378 LockWaitContext wait = (LockWaitContext)waiters.remove(txn); 379 removeAndCancelWaitTimer(wait); 380 createPendingFromWaiter(wait); 381 } 382 383 private void removeAndCancelWaitTimer(LockWaitContext wait) { 384 TimerTask task = (TimerTask ) waitTimers.remove(wait); 385 if (task != null) task.cancel(); 386 } 387 388 private void createPendingFromWaiter(LockWaitContext wait) { 389 Request request = new Request(((LockWaitContextImpl) wait).getThreadContext(), wait.lockLevel(), wait 391 .getLockResponseSink()); 392 pendingLockRequests.add(request); 393 if (isPolicyGreedy() && hasGreedyHolders()) { 394 recall(LockLevel.WRITE); 395 } 396 } 397 398 synchronized void waitTimeout(LockWaitContext context) { 399 400 ServerThreadContext txn = ((LockWaitContextImpl) context).getThreadContext(); 403 Object removed = waiters.remove(txn); 404 405 if (removed != null) { 406 waitTimers.remove(context); 407 Sink lockResponseSink = context.getLockResponseSink(); 408 int lockLevel = context.lockLevel(); 409 410 lockResponseSink.add(createLockWaitTimeoutResponseContext(this.lockID, txn.getId(), lockLevel)); 412 413 if (holders.size() == 0) { 414 if (isPolicyGreedy() && (getWaiterCount() == 0)) { 415 awardGreedyAndRespond(txn, lockLevel, lockResponseSink); 416 } else { 417 awardAndRespond(txn, lockLevel, lockResponseSink); 418 } 419 } else { 420 createPendingFromWaiter(context); 421 } 422 } 423 } 424 425 synchronized void wait(ServerThreadContext txn, WaitTimer waitTimer, WaitInvocation call, WaitTimerCallback callback, 426 Sink lockResponseSink) throws TCIllegalMonitorStateException { 427 if (waiters.containsKey(txn)) throw Assert.failure("Already in wait set: " + txn); 429 checkLegalWaitNotifyState(txn); 430 431 Holder current = getHolder(txn); 432 Assert.assertNotNull(current); 433 boolean isUpgrade = current.isUpgrade(); 434 435 LockWaitContext waitContext = new LockWaitContextImpl(txn, this, call, current.getLockLevel(), lockResponseSink); 436 waiters.put(txn, waitContext); 437 438 scheduleWait(callback, waitTimer, waitContext); 439 txn.setWaitingOn(this); 440 removeCurrentHold(txn); 441 442 if (isUpgrade) { 443 removeCurrentHold(txn); 446 } 447 448 nextPending(); 449 } 450 451 synchronized void addRecalledWaiter(ServerThreadContext txn, WaitInvocation call, int lockLevel, 454 Sink lockResponseSink, WaitTimer waitTimer, WaitTimerCallback callback) { 455 457 LockWaitContext waitContext = new LockWaitContextImpl(txn, this, call, lockLevel, lockResponseSink); 458 if (waiters.containsKey(txn)) { 459 logger.debug("addRecalledWaiter(): Ignoring " + waitContext + " as it is already in waiters list."); 460 return; 461 } 462 Request request = new Request(txn, lockLevel, lockResponseSink); 463 if (pendingLockRequests.contains(request)) { 464 logger.debug("addRecalledWaiter(): Ignoring " + waitContext + " as it is already in pending list."); 465 return; 466 } 467 waiters.put(txn, waitContext); 468 scheduleWait(callback, waitTimer, waitContext); 469 } 470 471 synchronized void reestablishWait(ServerThreadContext txn, WaitInvocation call, int lockLevel, Sink lockResponseSink) { 474 LockWaitContext waitContext = new LockWaitContextImpl(txn, this, call, lockLevel, lockResponseSink); 475 Object old = waiters.put(txn, waitContext); 476 if (old != null) throw Assert.failure("Already in wait set: " + txn); 477 } 478 479 synchronized void reestablishLock(ServerThreadContext threadContext, int requestedLevel, Sink lockResponseSink) { 480 if ((LockLevel.isWrite(requestedLevel) && holders.size() != 0) 481 || (LockLevel.isRead(requestedLevel) && LockLevel.isWrite(this.level))) { 482 throw new AssertionError ("Lock " + this + " already held by other Holder. Can't grant to " + threadContext 484 + LockLevel.toString(requestedLevel)); 485 486 } 487 if (waiters.get(threadContext) != null) { 488 throw new AssertionError ("Thread " + threadContext + "is already in Wait state for Lock " + this 490 + ". Can't grant Lock Hold !"); 491 } 492 if (isGreedyRequest(threadContext)) { 493 int greedyLevel = LockLevel.makeGreedy(requestedLevel); 494 ChannelID ch = threadContext.getId().getChannelID(); 495 awardLock(threadContext, greedyLevel); 496 Holder holder = getHolder(threadContext); 497 holder.setSink(lockResponseSink); 498 greedyHolders.put(ch, holder); 499 } else { 500 awardLock(threadContext, requestedLevel); 501 } 502 } 503 504 private void scheduleWait(WaitTimerCallback callback, WaitTimer waitTimer, LockWaitContext waitContext) { 505 final TimerTask timer = waitTimer.scheduleTimer(callback, waitContext.getWaitInvocation(), waitContext); 506 if (timer != null) { 507 waitTimers.put(waitContext, timer); 508 } 509 } 510 511 private void checkLegalWaitNotifyState(ServerThreadContext threadContext) throws TCIllegalMonitorStateException { 512 Assert.assertFalse(isNull()); 513 514 final int holdersSize = holders.size(); 515 if (holdersSize != 1) { throw new TCIllegalMonitorStateException("Invalid holder set size: " + holdersSize); } 516 517 final int currentLevel = this.level; 518 if (!LockLevel.isWrite(currentLevel)) { throw new TCIllegalMonitorStateException("Incorrect lock level: " 519 + LockLevel.toString(currentLevel)); } 520 521 Holder holder = getHolder(threadContext); 522 if (holder == null) { 523 holder = getHolder(getClientVMContext(threadContext)); 524 } 525 526 if (holder == null) { 527 throw new TCIllegalMonitorStateException(threadContext + " is not the current lock holder for: " + threadContext); 529 } 530 } 531 532 private ServerThreadContext getClientVMContext(ServerThreadContext threadContext) { 533 return threadContextFactory.getOrCreate(threadContext.getId().getChannelID(), ThreadID.VM_ID); 534 } 535 536 public synchronized int getHoldersCount() { 537 return holders.size(); 538 } 539 540 public synchronized int getPendingCount() { 541 return pendingLockRequests.size(); 542 } 543 544 public synchronized int getPendingUpgradeCount() { 545 return pendingLockUpgrades.size(); 546 } 547 548 Collection getHoldersCollection() { 549 return Collections.unmodifiableCollection(this.holders.values()); 550 } 551 552 public synchronized String toString() { 553 try { 554 StringBuffer rv = new StringBuffer (); 555 556 rv.append(lockID).append(", ").append("Level: ").append(LockLevel.toString(this.level)).append("\r\n"); 557 558 rv.append("Holders (").append(holders.size()).append(")\r\n"); 559 for (Iterator iter = holders.values().iterator(); iter.hasNext();) { 560 rv.append('\t').append(iter.next().toString()).append("\r\n"); 561 } 562 563 rv.append("Wait Set (").append(waiters.size()).append(")\r\n"); 564 for (Iterator iter = waiters.values().iterator(); iter.hasNext();) { 565 rv.append('\t').append(iter.next().toString()).append("\r\n"); 566 } 567 568 rv.append("Pending lock requests (").append(pendingLockRequests.size()).append(")\r\n"); 569 for (Iterator iter = pendingLockRequests.iterator(); iter.hasNext();) { 570 rv.append('\t').append(iter.next().toString()).append("\r\n"); 571 } 572 573 rv.append("Pending lock upgrades (").append(pendingLockUpgrades.size()).append(")\r\n"); 574 for (Iterator iter = pendingLockUpgrades.iterator(); iter.hasNext();) { 575 rv.append('\t').append(iter.next().toString()).append("\r\n"); 576 } 577 578 return rv.toString(); 579 } catch (Throwable t) { 580 t.printStackTrace(); 581 return "Exception in toString(): " + t.getMessage(); 582 } 583 } 584 585 private void awardLock(ServerThreadContext threadContext, int lockLevel) { 586 Assert.assertFalse(isNull()); 587 588 Holder holder = getHolder(threadContext); 589 590 if (holder != null) { 591 holder.addLockLevel(lockLevel); 592 this.level = holder.getLockLevel(); 593 } else { 594 threadContext.addLock(this); 595 holder = new Holder(this.lockID, threadContext, this.timeout); 596 holder.addLockLevel(lockLevel); 597 Object prev = this.holders.put(threadContext, holder); 598 Assert.assertNull(prev); 599 this.level = holder.getLockLevel(); 600 notifyAwardLock(holder); 601 } 602 603 } 604 605 private void notifyAwardLock(Holder holder) { 606 final int waitingCount = this.pendingLockRequests.size(); 607 608 for (int i = 0; i < listeners.length; i++) { 609 listeners[i].notifyAward(waitingCount, holder); 610 } 611 } 612 613 public synchronized boolean isRead() { 614 return LockLevel.READ == this.level; 615 } 616 617 public synchronized boolean isWrite() { 618 return LockLevel.WRITE == this.level; 619 } 620 621 synchronized void addPending(ServerThreadContext threadContext, int lockLevel, Sink awardLockSink) { 623 Assert.assertFalse(isNull()); 624 626 Request request = new Request(threadContext, lockLevel, awardLockSink); 627 628 if ((lockLevel == LockLevel.WRITE) && holdsReadLock(threadContext)) { 629 this.pendingLockUpgrades.add(request); 631 } else { 632 if (pendingLockRequests.contains(request)) { 633 logger.debug("Ignoring existing Request " + request + " in Lock " + lockID); 634 return; 635 } 636 637 this.pendingLockRequests.add(request); 638 for (Iterator currentHolders = holders.values().iterator(); currentHolders.hasNext();) { 639 Holder holder = (Holder) currentHolders.next(); 640 notifyAddPending(holder); 641 } 642 } 643 644 threadContext.setWaitingOn(this); 645 } 646 647 private boolean holdsReadLock(ServerThreadContext threadContext) { 648 Holder holder = getHolder(threadContext); 649 if (holder != null) { return holder.getLockLevel() == LockLevel.READ; } 650 return false; 651 } 652 653 private Holder getHolder(ServerThreadContext threadContext) { 654 return (Holder) this.holders.get(threadContext); 655 } 656 657 private void notifyAddPending(Holder holder) { 658 final int waitingCount = this.pendingLockRequests.size(); 659 660 for (int i = 0; i < this.listeners.length; i++) { 661 this.listeners[i].notifyAddPending(waitingCount, holder); 662 } 663 } 664 665 synchronized int getWaiterCount() { 666 return this.waiters.size(); 667 } 668 669 synchronized boolean hasPending() { 670 return pendingLockRequests.size() > 0 || pendingLockUpgrades.size() > 0; 671 } 672 673 synchronized boolean hasWaiting() { 674 return this.waiters.size() > 0; 675 } 676 677 boolean hasGreedyHolders() { 678 return this.greedyHolders.size() > 0; 679 } 680 681 synchronized boolean hasWaiting(ServerThreadContext threadContext) { 682 return (this.waiters.get(threadContext) != null); 683 } 684 685 public LockID getLockID() { 686 return lockID; 687 } 688 689 public boolean isNull() { 690 return this.isNull; 691 } 692 693 public int hashCode() { 694 return this.lockID.hashCode(); 695 } 696 697 public boolean equals(Object obj) { 698 if (obj instanceof Lock) { 699 Lock other = (Lock) obj; 700 return this.lockID.equals(other.lockID); 701 } 702 return false; 703 } 704 705 synchronized boolean nextPending() { 706 Assert.eval(!isNull()); 707 709 boolean clear; 710 try { 711 if ((holders.size() == 1) && (!pendingLockUpgrades.isEmpty())) { 714 Request request = (Request) pendingLockUpgrades.get(0); 716 if (holdsReadLock(request.getThreadContext())) { 717 pendingLockUpgrades.remove(0); 720 grantRequest(request); 721 } 722 } else if (holders.isEmpty()) { 723 if (!pendingLockRequests.isEmpty()) { 724 Request request = (Request) pendingLockRequests.get(0); 725 int reqLockLevel = request.getLockLevel(); 726 switch (reqLockLevel) { 727 case LockLevel.WRITE: { 728 if (isPolicyGreedy() && isAllPendingLockRequestsFromChannel(request.getRequesterID()) 730 && (getWaiterCount() == 0)) { 731 pendingLockRequests.remove(0); 733 grantGreedyRequest(request); 734 break; 735 } 736 } 738 case UPGRADE: { 739 pendingLockRequests.remove(0); 742 grantRequest(request); 743 break; 744 } 745 case LockLevel.READ: { 746 awardAllReads(); 748 break; 749 } 750 default: { 751 throw new TCInternalError("Unknown lock level in request: " + reqLockLevel); 752 } 753 } 754 } 755 } 756 757 } finally { 758 clear = holders.size() == 0 && this.waiters.size() == 0 && this.pendingLockRequests.size() == 0 759 && this.pendingLockUpgrades.size() == 0; 760 } 761 762 return clear; 763 } 764 765 private void grantGreedyRequest(Request request) { 766 ServerThreadContext threadContext = request.getThreadContext(); 768 awardGreedyAndRespond(threadContext, request.getLockLevel(), request.getLockResponseSink()); 769 clearWaitingOn(threadContext); 770 } 771 772 private void grantRequest(Request request) { 773 ServerThreadContext threadContext = request.getThreadContext(); 775 awardLock(threadContext, request.getLockLevel()); 776 clearWaitingOn(threadContext); 777 request.execute(lockID); 778 } 779 780 785 synchronized boolean removeCurrentHold(ServerThreadContext threadContext) { 786 Holder holder = getHolder(threadContext); 788 if (holder != null) { 789 if (holder.isUpgrade()) { 790 holder.removeLockLevel(LockLevel.WRITE); 791 this.level = holder.getLockLevel(); 792 return true; 793 } else { 794 this.holders.remove(threadContext); 795 threadContext.removeLock(this); 796 threadContextFactory.removeIfClear(threadContext); 797 if (isGreedyRequest(threadContext)) { 798 removeGreedyHolder(threadContext.getId().getChannelID()); 799 } 800 this.level = (holders.size() == 0 ? LockLevel.NIL_LOCK_LEVEL : LockLevel.READ); 801 notifyRevoke(holder); 802 } 803 } 804 return false; 805 } 806 807 synchronized boolean recallCommit(ServerThreadContext threadContext) { 808 Assert.assertTrue(isGreedyRequest(threadContext)); 810 boolean issueRecall = !recalled; 811 removeCurrentHold(threadContext); 812 if (issueRecall) { 813 recall(LockLevel.WRITE); 814 } 815 if (recalled == false) { return nextPending(); } 816 return false; 817 } 818 819 private synchronized void removeGreedyHolder(ChannelID channelID) { 820 greedyHolders.remove(channelID); 822 if (!hasGreedyHolders()) { 823 recalled = false; 824 } 825 } 826 827 private void clearWaitingOn(ServerThreadContext threadContext) { 828 threadContext.clearWaitingOn(); 829 threadContextFactory.removeIfClear(threadContext); 830 } 831 832 synchronized void awardAllReads() { 833 List pendingReadLockRequests = new ArrayList (pendingLockRequests.size()); 835 boolean hasPendingWrites = false; 836 837 for (Iterator i = pendingLockRequests.iterator(); i.hasNext();) { 838 Request request = (Request) i.next(); 839 if (request.getLockLevel() == LockLevel.READ) { 840 pendingReadLockRequests.add(request); 841 i.remove(); 842 } else if (!hasPendingWrites && request.getLockLevel() == LockLevel.WRITE) { 843 hasPendingWrites = true; 844 } 845 } 846 847 for (Iterator i = pendingReadLockRequests.iterator(); i.hasNext();) { 848 Request request = (Request) i.next(); 849 if (isPolicyGreedy() && !hasPendingWrites) { 850 ServerThreadContext tid = request.getThreadContext(); 851 if (!holdsGreedyLock(tid)) { 852 grantGreedyRequest(request); 853 } else { 854 clearWaitingOn(tid); 856 } 857 } else { 858 grantRequest(request); 859 } 860 } 861 } 862 863 synchronized boolean holdsSomeLock(ChannelID ch) { 864 for (Iterator iter = holders.values().iterator(); iter.hasNext();) { 865 Holder holder = (Holder) iter.next(); 866 if (holder.getChannelID().equals(ch)) { return true; } 867 } 868 return false; 869 } 870 871 synchronized boolean holdsGreedyLock(ServerThreadContext threadContext) { 872 return (greedyHolders.get(threadContext.getId().getChannelID()) != null); 873 } 874 875 synchronized boolean canAwardGreedilyOnTheClient(ServerThreadContext threadContext, int lockLevel) { 876 Holder holder = (Holder) greedyHolders.get(threadContext.getId().getChannelID()); 877 if (holder != null) { return (LockLevel.isWrite(holder.getLockLevel()) || holder.getLockLevel() == lockLevel); } 878 return false; 879 } 880 881 private void notifyRevoke(Holder holder) { 882 for (int i = 0; i < this.listeners.length; i++) { 883 this.listeners[i].notifyRevoke(holder); 884 } 885 } 886 887 void notifyStarted(WaitTimerCallback callback, WaitTimer timer) { 888 for (Iterator i = waiters.values().iterator(); i.hasNext();) { 889 LockWaitContext ctxt = (LockWaitContext) i.next(); 890 scheduleWait(callback, timer, ctxt); 891 } 892 } 893 894 synchronized boolean isAllPendingLockRequestsFromChannel(ChannelID channelId) { 895 for (Iterator i = pendingLockRequests.iterator(); i.hasNext();) { 896 Request r = (Request) i.next(); 897 if (!r.getRequesterID().equals(channelId)) { return false; } 898 } 899 return true; 900 } 901 902 909 synchronized void clearStateForChannel(ChannelID channelId) { 910 for (Iterator i = holders.values().iterator(); i.hasNext();) { 912 Holder holder = (Holder) i.next(); 913 if (holder.getChannelID().equals(channelId)) { 914 i.remove(); 915 } 916 } 917 918 for (Iterator i = pendingLockUpgrades.iterator(); i.hasNext();) { 919 Request r = (Request) i.next(); 920 if (r.getRequesterID().equals(channelId)) { 921 i.remove(); 922 } 923 } 924 925 for (Iterator i = pendingLockRequests.iterator(); i.hasNext();) { 926 Request r = (Request) i.next(); 927 if (r.getRequesterID().equals(channelId)) { 928 i.remove(); 929 } 930 } 931 932 for (Iterator i = waiters.values().iterator(); i.hasNext();) { 933 LockWaitContext wc = (LockWaitContext) i.next(); 934 if (wc.getChannelID().equals(channelId)) { 935 i.remove(); 936 } 937 } 938 939 for (Iterator i = waitTimers.keySet().iterator(); i.hasNext();) { 940 LockWaitContext wc = (LockWaitContext) i.next(); 941 if (wc.getChannelID().equals(channelId)) { 942 try { 943 TimerTask task = (TimerTask ) waitTimers.get(wc); 944 task.cancel(); 945 } finally { 946 i.remove(); 947 } 948 } 949 } 950 removeGreedyHolder(channelId); 951 } 952 953 synchronized void checkAndClearStateOnGreedyAward(ChannelID ch, int requestedLevel) { 954 Assert.assertTrue(pendingLockUpgrades.size() == 0); 958 Assert.assertTrue((requestedLevel == LockLevel.READ) || (waiters.size() == 0)); 959 960 for (Iterator i = holders.values().iterator(); i.hasNext();) { 961 Holder holder = (Holder) i.next(); 962 if (holder.getChannelID().equals(ch)) { 963 i.remove(); 964 } 965 } 966 for (Iterator i = pendingLockRequests.iterator(); i.hasNext();) { 967 Request r = (Request) i.next(); 968 if (r.getRequesterID().equals(ch)) { 969 if ((requestedLevel == LockLevel.WRITE) || (r.getLockLevel() == requestedLevel)) { 970 i.remove(); 972 ServerThreadContext tid = r.getThreadContext(); 973 clearWaitingOn(tid); 975 } else { 976 throw new AssertionError ("Issuing READ lock greedily when WRITE pending !"); 977 } 978 } 979 } 980 } 982 983 1000} 1001 | Popular Tags |