1 5 package com.tc.objectserver.lockmanager.impl; 6 7 import com.tc.async.api.Sink; 8 import com.tc.exception.ImplementMe; 9 import com.tc.logging.CustomerLogging; 10 import com.tc.logging.TCLogger; 11 import com.tc.logging.TCLogging; 12 import com.tc.net.protocol.tcm.ChannelID; 13 import com.tc.object.lockmanager.api.LockContext; 14 import com.tc.object.lockmanager.api.LockID; 15 import com.tc.object.lockmanager.api.LockLevel; 16 import com.tc.object.lockmanager.api.ThreadID; 17 import com.tc.object.lockmanager.api.WaitContext; 18 import com.tc.object.lockmanager.api.WaitTimer; 19 import com.tc.object.lockmanager.api.WaitTimerCallback; 20 import com.tc.object.lockmanager.impl.WaitTimerImpl; 21 import com.tc.object.net.DSOChannelManager; 22 import com.tc.object.tx.WaitInvocation; 23 import com.tc.objectserver.lockmanager.api.DeadlockChain; 24 import com.tc.objectserver.lockmanager.api.DeadlockResults; 25 import com.tc.objectserver.lockmanager.api.LockEventListener; 26 import com.tc.objectserver.lockmanager.api.LockMBean; 27 import com.tc.objectserver.lockmanager.api.LockManager; 28 import com.tc.objectserver.lockmanager.api.LockManagerMBean; 29 import com.tc.objectserver.lockmanager.api.LockWaitContext; 30 import com.tc.objectserver.lockmanager.api.NotifiedWaiters; 31 import com.tc.objectserver.lockmanager.api.TCIllegalMonitorStateException; 32 import com.tc.util.Assert; 33 34 import java.util.ArrayList ; 35 import java.util.Collection ; 36 import java.util.HashMap ; 37 import java.util.HashSet ; 38 import java.util.Iterator ; 39 import java.util.List ; 40 import java.util.Map ; 41 42 48 public class LockManagerImpl implements LockManager, LockManagerMBean, WaitTimerCallback { 49 private static final TCLogger logger = TCLogging 50 .getLogger(LockManagerImpl.class); 51 private static final TCLogger clogger = CustomerLogging.getDSOGenericLogger(); 52 53 public static final LockManagerErrorDescription NOT_STARTING_ERROR = new LockManagerErrorDescription( 54 "NOT STARTING"); 55 public static final LockManagerErrorDescription NOT_STARTED_ERROR = new LockManagerErrorDescription( 56 "NOT STARTED"); 57 public static final LockManagerErrorDescription IS_STARTING_ERROR = new LockManagerErrorDescription( 58 "IS STARTING"); 59 public static final LockManagerErrorDescription IS_STOPPED_ERROR = new LockManagerErrorDescription( 60 "IS STOPPED"); 61 62 public static final LockManagerErrorDescription LOCK_ALREADY_GRANTED_ERROR = new LockManagerErrorDescription( 63 "LOCK ALREADY GRANTED"); 64 65 public static final int UNINITIALIZED_LOCK_POLICY = 0x00; 66 public static final int GREEDY_LOCK_POLICY = 0x01; 67 public static final int ALTRUISTIC_LOCK_POLICY = 0x02; 68 69 private static final State STARTING = new State("STARTING"); 70 private static final State STARTED = new State("STARTED"); 71 private static final State STOPPING = new State("STOPPING"); 72 private static final State STOPPED = new State("STOPPED"); 73 74 private State status = STARTING; 75 private final Map locks = new HashMap (); 76 private final LockEventListener lockTimer; 77 private final DSOChannelManager channelManager; 78 private final LockEventListener[] lockListeners; 79 private final WaitTimer waitTimer; 80 81 private final long lockTimeout = 1000 * 60 * 2; 83 private int lockPolicy = UNINITIALIZED_LOCK_POLICY; 84 86 private final List lockRequestQueue = new ArrayList (); 87 private final ServerThreadContextFactory threadContextFactory = new ServerThreadContextFactory(); 88 89 public LockManagerImpl(DSOChannelManager channelManager) { 90 this.channelManager = channelManager; 91 92 this.lockTimer = new NullLockTimer(); 96 this.lockListeners = new LockEventListener[] { this.lockTimer }; 97 98 this.waitTimer = new WaitTimerImpl(); 101 } 102 103 public synchronized void dump() { 104 StringBuffer buf = new StringBuffer ("LockManager"); 105 buf.append("locks=" + locks).append("\n"); 106 buf.append("/LockManager").append("\n"); 107 System.err.println(buf.toString()); 108 } 109 110 public synchronized int getLockCount() { 111 return this.locks.size(); 112 } 113 114 public synchronized int getThreadContextCount() { 115 return this.threadContextFactory.getCount(); 116 } 117 118 public synchronized void verify(ChannelID channelID, LockID[] lockIDs) { 119 if (!isStarted()) { return; } 120 for (int i = 0; i < lockIDs.length; i++) { 121 Lock lock = (Lock) locks.get(lockIDs[i]); 122 if (lock == null) { 123 String errorMsg = " Lock is not held for " + lockIDs[i] + ". Not by " + channelID 124 + ". Not by anyone. uhm... Nada"; 125 logger.warn(errorMsg); 126 throw new AssertionError (errorMsg); 127 } 128 if (!lock.holdsSomeLock(channelID)) { throw new AssertionError (" Lock " + lockIDs[i] 129 + " is not held by anyone in " + channelID); } 130 } 131 } 132 133 public synchronized void reestablishLock(LockID lockID, ChannelID channelID, ThreadID sourceID, int requestedLevel, 134 Sink lockResponseSink) { 135 assertStarting(); 136 ServerThreadContext threadContext = threadContextFactory.getOrCreate(channelID, sourceID); 137 Lock lock = (Lock) this.locks.get(lockID); 138 139 if (lock == null) { 140 lock = new Lock(lockID, threadContext, this.lockTimeout, this.lockListeners, this.lockPolicy, 141 threadContextFactory); 142 locks.put(lockID, lock); 143 } 144 lock.reestablishLock(threadContext, requestedLevel, lockResponseSink); 145 151 } 152 153 public synchronized boolean tryRequestLock(LockID lockID, ChannelID channelID, ThreadID sourceID, int requestedLevel, 154 Sink lockResponseSink) { 155 return requestLock(lockID, channelID, sourceID, requestedLevel, lockResponseSink, true); 156 } 157 158 private synchronized boolean requestLock(LockID lockID, ChannelID channelID, ThreadID threadID, int requestedLevel, 159 Sink lockResponseSink, boolean noBlock) { 160 if (!channelManager.isActiveID(channelID)) return false; 161 if (isStarting()) { 162 queueRequestLock(lockID, channelID, threadID, requestedLevel, lockResponseSink, noBlock); 163 return false; 164 } 165 if (!isStarted()) return false; 166 return basicRequestLock(lockID, channelID, threadID, requestedLevel, lockResponseSink, noBlock); 167 } 168 169 public synchronized boolean requestLock(LockID lockID, ChannelID channelID, ThreadID sourceID, int requestedLevel, 170 Sink lockResponseSink) { 171 return requestLock(lockID, channelID, sourceID, requestedLevel, lockResponseSink, false); 172 } 173 174 private boolean basicRequestLock(LockID lockID, ChannelID channelID, ThreadID threadID, int requestedLevel, 175 Sink lockResponseSink, boolean noBlock) { 176 ServerThreadContext threadContext = threadContextFactory.getOrCreate(channelID, threadID); 177 Lock lock = (Lock) this.locks.get(lockID); 178 179 if (lock != null) { 180 if (noBlock) { 181 return lock.tryRequestLock(threadContext, requestedLevel, lockResponseSink); 182 } else { 183 return lock.requestLock(threadContext, requestedLevel, lockResponseSink); 184 } 185 } else { 186 lock = new Lock(lockID, threadContext, requestedLevel, lockResponseSink, this.lockTimeout, this.lockListeners, 187 this.lockPolicy, threadContextFactory); 188 locks.put(lockID, lock); 189 return true; 190 } 191 } 192 193 private void queueRequestLock(LockID lockID, ChannelID channelID, ThreadID threadID, int requestedLevel, 194 Sink lockResponseSink, boolean noBlock) { 195 lockRequestQueue 196 .add(new RequestLockContext(lockID, channelID, threadID, requestedLevel, lockResponseSink, noBlock)); 197 } 198 199 public synchronized void queryLock(LockID lockID, ChannelID channelID, ThreadID threadID, Sink lockResponseSink) { 200 assertNotStarting(); 201 if (!isStarted()) return; 202 203 Lock lock = getLockFor(lockID); 204 ServerThreadContext threadContext = threadContextFactory.getOrCreate(channelID, threadID); 205 lock.queryLock(threadContext, lockResponseSink); 206 } 207 208 public synchronized void interrupt(LockID lockID, ChannelID channelID, ThreadID threadID) { 209 assertNotStarting(); 210 if (!isStarted()) return; 211 212 Lock lock = getLockFor(lockID); 213 ServerThreadContext threadContext = threadContextFactory.getOrCreate(channelID, threadID); 214 lock.interrupt(threadContext); 215 } 216 217 public synchronized void unlock(LockID id, ChannelID channelID, ThreadID threadID) { 218 assertNotStarting(); 219 if (!isStarted()) return; 220 221 Lock l = getLockFor(id); 222 if (l.isNull()) { 223 logger.warn("An attempt was made to unlock:" + id + " for channelID:" + channelID 224 + " This lock was not held. This could be do to that node being down so it may not be an error."); 225 return; 226 } 227 basicUnlock(l, threadContextFactory.getOrCreate(channelID, threadID)); 228 } 229 230 public synchronized void wait(LockID lid, ChannelID cid, ThreadID tid, WaitInvocation call, Sink lockResponseSink) { 231 assertNotStopped(); 232 Lock lock = (Lock) this.locks.get(lid); 233 if (lock != null) { 234 ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, tid); 235 try { 236 lock.wait(threadContext, waitTimer, call, this, lockResponseSink); 237 notifyAll(); 238 } catch (TCIllegalMonitorStateException e) { 239 e.printStackTrace(); 240 throw new ImplementMe(); 242 } 243 } else { 244 throw new ImplementMe(); 246 } 247 } 248 249 public synchronized void reestablishWait(LockID lid, ChannelID cid, ThreadID tid, int lockLevel, WaitInvocation call, 250 Sink lockResponseSink) { 251 assertStarting(); 252 Lock lock = (Lock) this.locks.get(lid); 253 ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, tid); 254 if (lock == null) { 255 lock = new Lock(lid, threadContext, this.lockTimeout, this.lockListeners, this.lockPolicy, threadContextFactory); 256 locks.put(lid, lock); 257 } 258 lock.reestablishWait(threadContext, call, lockLevel, lockResponseSink); 259 } 260 261 public synchronized void recallCommit(LockID lid, ChannelID cid, Collection lockContexts, Collection waitContexts, 262 Collection pendingLockContexts, Sink lockResponseSink) { 263 assertNotStarting(); 264 Lock lock = (Lock) this.locks.get(lid); 265 Assert.assertNotNull(lock); 266 267 synchronized (lock) { 268 for (Iterator i = lockContexts.iterator(); i.hasNext();) { 269 LockContext ctxt = (LockContext) i.next(); 270 ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, ctxt.getThreadID()); 271 lock.addRecalledHolder(threadContext, ctxt.getLockLevel()); 272 } 273 274 for (Iterator i = waitContexts.iterator(); i.hasNext();) { 275 WaitContext ctxt = (WaitContext) i.next(); 276 ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, ctxt.getThreadID()); 277 lock.addRecalledWaiter(threadContext, ctxt.getWaitInvocation(), ctxt.getLockLevel(), lockResponseSink, 278 waitTimer, this); 279 } 280 281 for (Iterator i = pendingLockContexts.iterator(); i.hasNext();) { 282 LockContext ctxt = (LockContext) i.next(); 283 ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, ctxt.getThreadID()); 284 lock.addRecalledPendingRequest(threadContext, ctxt.getLockLevel(), lockResponseSink); 285 } 286 287 ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, ThreadID.VM_ID); 288 if (lock.recallCommit(threadContext)) { 289 locks.remove(lid); 290 threadContextFactory.removeIfClear(threadContext); 291 } 292 } 293 } 294 295 public synchronized void waitTimeout(Object callbackObject) { 296 if (isStarted() && callbackObject instanceof LockWaitContext) { 297 LockWaitContext context = (LockWaitContext) callbackObject; 298 context.waitTimeout(); 299 } else { 300 logger.warn("Ignoring wait timeout for : " + callbackObject); 301 } 302 } 303 304 public synchronized void notify(LockID lid, ChannelID cid, ThreadID tid, boolean all, 305 NotifiedWaiters addNotifiedWaitersTo) { 306 if (!isStarted()) { 308 if (isStarting()) { 309 throw new AssertionError ("Notify was called before the LockManager was started."); 310 } else { 311 logger.warn("Notify was called after shutdown sequence commenced."); 312 } 313 } 314 Lock lock = (Lock) this.locks.get(lid); 315 if (lock != null) { 316 ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, tid); 317 try { 318 lock.notify(threadContext, all, addNotifiedWaitersTo); 319 if (false) System.err.println("LockManager.notify(" + lid + ", " + cid + ", " + tid + ", all=" + all 320 + ", notifiedWaiters=" + addNotifiedWaitersTo); 321 } catch (TCIllegalMonitorStateException e) { 322 e.printStackTrace(); 323 throw new AssertionError (e); 324 } 325 } else { 326 throw new AssertionError ("Lock :" + lid + " is not present !"); 327 } 328 } 329 330 private void basicUnlock(Lock lock, ServerThreadContext threadContext) { 331 boolean wasUpgrade = lock.removeCurrentHold(threadContext); 332 if (isStarted()) { 333 if (wasUpgrade) { 334 lock.awardAllReads(); 335 } else { 336 boolean clear = lock.nextPending(); 337 if (clear) { 338 locks.remove(lock.getLockID()); 339 } 340 } 341 } 342 threadContextFactory.removeIfClear(threadContext); 343 notifyAll(); 344 } 345 346 public synchronized boolean hasPending(LockID id) { 347 return getLockFor(id).hasPending(); 348 } 349 350 public synchronized void clearAllLocksFor(ChannelID channelID) { 351 HashSet allLocks = new HashSet (locks.keySet()); 352 353 356 for (Iterator i = allLocks.iterator(); i.hasNext();) { 357 LockID lid = (LockID) i.next(); 358 Lock lock = getLockFor(lid); 359 360 if (!lock.isNull()) { 361 lock.clearStateForChannel(channelID); 362 363 basicUnlock(lock, ServerThreadContext.NULL_CONTEXT); 365 } 366 } 367 threadContextFactory.clear(channelID); 368 } 369 370 private Lock getLockFor(LockID id) { 371 Lock lock = (Lock) locks.get(id); 372 if (lock == null) return Lock.NULL_LOCK; 373 return lock; 374 } 375 376 public synchronized void scanForDeadlocks(DeadlockResults output) { 377 new DeadlockDetector(output).detect(threadContextFactory.getView().iterator()); 378 } 379 380 public DeadlockChain[] scanForDeadlocks() { 381 final List chains = new ArrayList (); 382 DeadlockResults results = new DeadlockResults() { 383 public void foundDeadlock(DeadlockChain chain) { 384 chains.add(chain); 385 } 386 }; 387 388 scanForDeadlocks(results); 389 390 return (DeadlockChain[]) chains.toArray(new DeadlockChain[chains.size()]); 391 } 392 393 public LockMBean[] getAllLocks() { 394 final List copy; 395 synchronized (this) { 396 copy = new ArrayList (locks.size()); 397 copy.addAll(locks.values()); 398 } 399 400 int count = 0; 401 LockMBean[] rv = new LockMBean[copy.size()]; 402 for (Iterator i = copy.iterator(); i.hasNext();) { 403 Lock lock = (Lock) i.next(); 404 rv[count++] = lock.getMBean(channelManager); 405 } 406 407 return rv; 408 } 409 410 public void start() { 411 synchronized (this) { 412 assertStarting(); 413 changeState(STARTED); 414 if (lockPolicy == UNINITIALIZED_LOCK_POLICY) { 415 this.lockPolicy = GREEDY_LOCK_POLICY; 416 } 417 418 logger.debug("START Locks re-established -- " + locks.size()); 419 for (Iterator i = locks.values().iterator(); i.hasNext();) { 420 Lock lock = (Lock) i.next(); 421 lock.setLockPolicy(lockPolicy); 422 lock.notifyStarted(this, waitTimer); 423 } 424 425 for (Iterator i = lockRequestQueue.iterator(); i.hasNext();) { 426 RequestLockContext ctxt = (RequestLockContext) i.next(); 427 requestLock(ctxt.lockID, ctxt.channelID, ctxt.threadID, ctxt.requestedLockLevel, ctxt.lockResponseSink, 428 ctxt.noBlock); 429 } 430 lockRequestQueue.clear(); 431 } 432 } 433 434 public synchronized void stop() throws InterruptedException { 435 while (isStarting()) 436 wait(); 437 assertStarted(); 438 cinfo("Stopping..."); 439 changeState(STOPPING); 440 441 locks.clear(); 442 threadContextFactory.clear(); 443 444 if (waitTimer != null) { 445 waitTimer.shutdown(); 446 } 447 setLockPolicy(ALTRUISTIC_LOCK_POLICY); 448 449 changeState(STOPPED); 450 cinfo("Stopped."); 451 452 } 453 454 public int getLockPolicy() { 455 return lockPolicy; 456 } 457 458 public void setLockPolicy(int lockPolicy) { 459 Assert.assertTrue(lockPolicy == GREEDY_LOCK_POLICY || lockPolicy == ALTRUISTIC_LOCK_POLICY); 460 this.lockPolicy = lockPolicy; 461 for (Iterator i = locks.values().iterator(); i.hasNext();) { 462 Lock lock = (Lock) i.next(); 463 lock.setLockPolicy(this.lockPolicy); 464 } 465 } 466 467 private void cinfo(Object message) { 468 clogger.debug("Lock Manager: " + message); 469 } 470 471 private void changeState(State s) { 472 this.status = s; 473 notifyAll(); 474 } 475 476 private boolean isStopped() { 477 return status == STOPPED; 478 } 479 480 private boolean isStarted() { 481 return status == STARTED; 482 } 483 484 private boolean isStarting() { 485 return status == STARTING; 486 } 487 488 private void assertStarting() { 489 if (!isStarting()) throw new LockManagerError(NOT_STARTING_ERROR, "LockManager is not starting (" 490 + this.status.getName() + ")"); 491 } 492 493 private void assertStarted() { 494 if (!isStarted()) throw new LockManagerError(NOT_STARTED_ERROR, "LockManager is not started (" 495 + this.status.getName() + ")"); 496 } 497 498 private void assertNotStarting() { 499 if (isStarting()) throw new LockManagerError(IS_STARTING_ERROR, "LockManager is starting"); 500 } 501 502 private void assertNotStopped() { 503 if (isStopped()) throw new LockManagerError(IS_STOPPED_ERROR, "LockManager is stopped"); 504 } 505 506 public static class LockManagerError extends Error { 507 508 private final LockManagerErrorDescription desc; 509 510 private LockManagerError(LockManagerErrorDescription desc, String msg) { 511 super(msg); 512 this.desc = desc; 513 } 514 515 public LockManagerErrorDescription getDescription() { 516 return this.desc; 517 } 518 } 519 520 public static class LockManagerErrorDescription { 521 private final String name; 522 523 private LockManagerErrorDescription(String name) { 524 this.name = name; 525 } 526 527 public String toString() { 528 return getClass().getName() + "[" + this.name + "]"; 529 } 530 } 531 532 private static class RequestLockContext { 533 final LockID lockID; 534 final ChannelID channelID; 535 final ThreadID threadID; 536 final int requestedLockLevel; 537 final boolean noBlock; 538 final Sink lockResponseSink; 539 540 private RequestLockContext(LockID lockID, ChannelID channelID, ThreadID threadID, int requestedLockLevel, 541 Sink lockResponseSink, boolean noBlock) { 542 this.lockID = lockID; 543 this.channelID = channelID; 544 this.threadID = threadID; 545 this.requestedLockLevel = requestedLockLevel; 546 this.lockResponseSink = lockResponseSink; 547 this.noBlock = noBlock; 548 } 549 550 public String toString() { 551 return "RequestLockContext [ " + lockID + "," + channelID + "," + threadID + "," 552 + LockLevel.toString(requestedLockLevel) + ", " + noBlock + " ]"; 553 } 554 } 555 556 private static class State { 557 private final String name; 558 559 private State(String name) { 560 this.name = name; 561 } 562 563 public String getName() { 564 return this.name; 565 } 566 567 public String toString() { 568 return getClass().getName() + "[" + this.name + "]"; 569 } 570 } 571 572 } 573 | Popular Tags |