1 5 package com.tc.object.lockmanager.impl; 6 7 import org.apache.commons.collections.map.ListOrderedMap; 8 9 import com.tc.logging.TCLogger; 10 import com.tc.object.lockmanager.api.ClientLockManager; 11 import com.tc.object.lockmanager.api.LockFlushCallback; 12 import com.tc.object.lockmanager.api.LockID; 13 import com.tc.object.lockmanager.api.LockLevel; 14 import com.tc.object.lockmanager.api.Notify; 15 import com.tc.object.lockmanager.api.QueryLockRequest; 16 import com.tc.object.lockmanager.api.RemoteLockManager; 17 import com.tc.object.lockmanager.api.ThreadID; 18 import com.tc.object.lockmanager.api.WaitListener; 19 import com.tc.object.lockmanager.api.WaitTimer; 20 import com.tc.object.session.SessionID; 21 import com.tc.object.session.SessionManager; 22 import com.tc.object.tx.WaitInvocation; 23 import com.tc.text.ConsoleParagraphFormatter; 24 import com.tc.text.ParagraphFormatter; 25 import com.tc.text.StringFormatter; 26 import com.tc.util.Assert; 27 import com.tc.util.State; 28 import com.tc.util.Util; 29 30 import java.util.ArrayList ; 31 import java.util.Collection ; 32 import java.util.HashMap ; 33 import java.util.HashSet ; 34 import java.util.Iterator ; 35 import java.util.Map ; 36 import java.util.TimerTask ; 37 38 41 public class ClientLockManagerImpl implements ClientLockManager, LockFlushCallback { 42 43 public static final long TIMEOUT = 60 * 1000; 44 45 private static final State RUNNING = new State("RUNNING"); 46 private static final State STARTING = new State("STARTING"); 47 private static final State PAUSED = new State("PAUSED"); 48 49 private static final String MISSING_LOCK_TEXT = makeMissingLockText(); 50 51 private State state = RUNNING; 52 private final Map locksByID = new HashMap (); 53 private final Map pendingQueryLockRequestsByID = new ListOrderedMap(); 54 private final Map lockInfoByID = new HashMap (); 55 private final RemoteLockManager remoteLockManager; 56 private final WaitTimer waitTimer = new WaitTimerImpl(); 57 private final TCLogger logger; 58 private final SessionManager sessionManager; 59 60 public ClientLockManagerImpl(TCLogger logger, RemoteLockManager remoteLockManager, SessionManager sessionManager) { 61 this.logger = logger; 62 this.remoteLockManager = remoteLockManager; 63 this.sessionManager = sessionManager; 64 waitTimer.getTimer().schedule(new LockGCTask(this), TIMEOUT, TIMEOUT); 65 } 66 67 public synchronized void pause() { 68 if (state == PAUSED) throw new AssertionError ("Attempt to pause while already paused : " + state); 69 this.state = PAUSED; 70 for (Iterator iter = new HashSet (locksByID.values()).iterator(); iter.hasNext();) { 71 ClientLock lock = (ClientLock) iter.next(); 72 lock.pause(); 73 } 74 } 75 76 public synchronized void starting() { 77 if (state != PAUSED) throw new AssertionError ("Attempt to start when not paused: " + state); 78 this.state = STARTING; 79 } 80 81 public synchronized void unpause() { 82 if (state != STARTING) throw new AssertionError ("Attempt to unpause when not starting: " + state); 83 this.state = RUNNING; 84 notifyAll(); 85 for (Iterator iter = locksByID.values().iterator(); iter.hasNext();) { 86 ClientLock lock = (ClientLock) iter.next(); 87 lock.unpause(); 88 } 89 resubmitQueryLockRequests(); 90 } 91 92 public synchronized boolean isStarting() { 93 return state == STARTING; 94 } 95 96 public synchronized void runGC() { 97 waitUntilRunning(); 98 logger.info("Running Lock GC..."); 99 ArrayList toGC = new ArrayList (locksByID.size()); 100 for (Iterator iter = locksByID.values().iterator(); iter.hasNext();) { 101 ClientLock lock = (ClientLock) iter.next(); 102 if (lock.timedout()) { 103 toGC.add(lock.getLockID()); 104 } 105 } 106 if (toGC.size() > 0) { 107 logger.debug("GCing " + (toGC.size() < 11 ? toGC.toString() : toGC.size() + " Locks ...")); 108 for (Iterator iter = toGC.iterator(); iter.hasNext();) { 109 LockID lockID = (LockID) iter.next(); 110 recall(lockID, ThreadID.VM_ID, LockLevel.WRITE); 111 } 112 } 113 } 114 115 private GlobalLockInfo getLockInfo(LockID lockID, ThreadID threadID) { 116 Object waitLock = addToPendingQueryLockRequest(lockID, threadID); 117 remoteLockManager.queryLock(lockID, threadID); 118 waitForQueryReply(threadID, waitLock); 119 GlobalLockInfo lockInfo; 120 synchronized (lockInfoByID) { 121 lockInfo = (GlobalLockInfo) lockInfoByID.remove(threadID); 122 } 123 return lockInfo; 124 } 125 126 public int queueLength(LockID lockID, ThreadID threadID) { 129 ClientLock lock; 130 synchronized (this) { 131 waitUntilRunning(); 132 lock = getLock(lockID); 133 } 134 GlobalLockInfo lockInfo = getLockInfo(lockID, threadID); 135 136 int queueLength = lockInfo.getLockRequestQueueLength() + lockInfo.getLockUpgradeQueueLength(); 137 if (lock != null) { 138 queueLength += lock.queueLength(); 139 } 140 return queueLength; 141 } 142 143 public int waitLength(LockID lockID, ThreadID threadID) { 146 ClientLock lock; 147 synchronized (this) { 148 waitUntilRunning(); 149 lock = getLock(lockID); 150 } 151 152 GlobalLockInfo lockInfo = getLockInfo(lockID, threadID); 153 int waitLength = lockInfo.getWaitersInfo().size(); 154 155 if (lock != null) { 156 return waitLength + lock.waitLength(); 157 } 158 159 return waitLength; 160 } 161 162 public int localHeldCount(LockID lockID, int lockLevel, ThreadID threadID) { 163 ClientLock lock; 164 synchronized (this) { 165 waitUntilRunning(); 166 lock = (ClientLock) locksByID.get(lockID); 167 } 168 if (lock == null) { 169 return 0; 170 } else { 171 return lock.localHeldCount(threadID, lockLevel); 172 } 173 } 174 175 public boolean isLocked(LockID lockID, ThreadID threadID) { 178 ClientLock lock; 179 synchronized (this) { 180 waitUntilRunning(); 181 lock = (ClientLock) locksByID.get(lockID); 182 } 183 if (lock != null) { 184 return lock.isHeld(); 185 } else { 186 GlobalLockInfo lockInfo = getLockInfo(lockID, threadID); 187 return lockInfo.isLocked(); 188 } 189 } 190 191 private void waitForQueryReply(ThreadID threadID, Object waitLock) { 192 boolean isInterrupted = false; 193 194 synchronized (waitLock) { 195 while (!hasLockInfo(threadID)) { 196 try { 197 waitLock.wait(); 198 } catch (InterruptedException ioe) { 199 isInterrupted = true; 200 } 201 } 202 } 203 Util.selfInterruptIfNeeded(isInterrupted); 204 } 205 206 private boolean hasLockInfo(ThreadID threadID) { 207 synchronized (lockInfoByID) { 208 return lockInfoByID.containsKey(threadID); 209 } 210 } 211 212 public void lock(LockID lockID, ThreadID threadID, int type) { 213 Assert.assertNotNull("threadID", threadID); 214 final ClientLock lock; 215 216 synchronized (this) { 217 waitUntilRunning(); 218 lock = getOrCreateLock(lockID); 219 lock.incUseCount(); 220 } 221 lock.lock(threadID, type); 222 } 223 224 public boolean tryLock(LockID lockID, ThreadID threadID, int type) { 225 Assert.assertNotNull("threadID", threadID); 226 final ClientLock lock; 227 228 synchronized (this) { 229 waitUntilRunning(); 230 lock = getOrCreateLock(lockID); 231 lock.incUseCount(); 232 } 233 boolean isLocked = lock.tryLock(threadID, type); 234 if (!isLocked) { 235 synchronized (this) { 236 lock.decUseCount(); 237 } 238 cleanUp(lock); 239 } 240 return isLocked; 241 } 242 243 public void unlock(LockID lockID, ThreadID threadID) { 244 final ClientLock myLock; 245 246 synchronized (this) { 247 waitUntilRunning(); 248 myLock = (ClientLock) locksByID.get(lockID); 249 if (myLock == null) { throw missingLockException(lockID); } 250 myLock.decUseCount(); 251 } 252 253 myLock.unlock(threadID); 254 cleanUp(myLock); 255 } 256 257 private AssertionError missingLockException(LockID lockID) { 258 return new AssertionError (MISSING_LOCK_TEXT + " Missing lock ID is " + lockID); 259 } 260 261 public void wait(LockID lockID, ThreadID threadID, WaitInvocation call, Object waitLock, WaitListener listener) 262 throws InterruptedException { 263 final ClientLock myLock; 264 synchronized (this) { 265 waitUntilRunning(); 266 myLock = (ClientLock) locksByID.get(lockID); 267 } 268 if (myLock == null) { throw missingLockException(lockID); } 269 myLock.wait(threadID, call, waitLock, listener); 270 } 271 272 public Notify notify(LockID lockID, ThreadID threadID, boolean all) { 273 final ClientLock myLock; 274 synchronized (this) { 275 waitUntilRunning(); 276 myLock = (ClientLock) locksByID.get(lockID); 277 } 278 if (myLock == null) { throw missingLockException(lockID); } 279 return myLock.notify(threadID, all); 280 } 281 282 285 public synchronized void recall(LockID lockID, ThreadID threadID, int interestedLevel) { 286 Assert.assertEquals(ThreadID.VM_ID, threadID); 287 if (isPaused()) { 288 logger.warn("Ignoring recall request from dead server : " + lockID + ", " + threadID + " interestedLevel : " 289 + LockLevel.toString(interestedLevel)); 290 return; 291 } 292 final ClientLock myLock = (ClientLock) locksByID.get(lockID); 293 if (myLock != null) { 294 myLock.recall(interestedLevel, this); 295 cleanUp(myLock); 296 } 297 } 298 299 public void transactionsForLockFlushed(LockID lockID) { 300 final ClientLock myLock; 301 synchronized (this) { 302 waitUntilRunning(); 303 myLock = (ClientLock) locksByID.get(lockID); 304 } 305 if (myLock != null) { 306 myLock.transactionsForLockFlushed(lockID); 307 cleanUp(myLock); 308 } 309 } 310 311 316 public synchronized void queryLockCommit(ThreadID threadID, GlobalLockInfo globalLockInfo) { 317 synchronized (lockInfoByID) { 318 lockInfoByID.put(threadID, globalLockInfo); 319 } 320 QueryLockRequest qRequest = (QueryLockRequest)pendingQueryLockRequestsByID.remove(threadID); 321 if (qRequest == null) { throw new AssertionError ("Query Lock request does not exist."); } 322 Object waitLock = qRequest.getWaitLock(); 323 synchronized (waitLock) { 324 waitLock.notifyAll(); 325 } 326 } 327 328 public synchronized void waitTimedOut(LockID lockID, ThreadID threadID) { 329 notified(lockID, threadID); 330 } 331 332 private synchronized void cleanUp(ClientLock lock) { 333 if (lock.isClear()) { 334 Object o = locksByID.get(lock.getLockID()); 335 if (o == lock) { 336 locksByID.remove(lock.getLockID()); 340 } 341 } 342 } 343 344 347 public synchronized void notified(LockID lockID, ThreadID threadID) { 348 if (isPaused()) { 349 logger.warn("Ignoring notified call from dead server : " + lockID + ", " + threadID); 350 return; 351 } 352 final ClientLock myLock = (ClientLock) locksByID.get(lockID); 353 if (myLock == null) { throw new AssertionError (lockID.toString()); } 354 myLock.notified(threadID); 355 } 356 357 361 public synchronized void awardLock(SessionID sessionID, LockID lockID, ThreadID threadID, int level) { 362 if (isPaused() || !sessionManager.isCurrentSession(sessionID)) { 363 logger.warn("Ignoring lock award from a dead server :" + sessionID + ", " + sessionManager + " : " + lockID + " " 364 + threadID + " " + LockLevel.toString(level) + " state = " + state); 365 return; 366 } 367 final ClientLock lock = (ClientLock) locksByID.get(lockID); 368 if (lock == null) { throw new AssertionError ("awardLock(): Lock not found" + lockID.toString() + " :: " + threadID 369 + " :: " + LockLevel.toString(level)); } 370 lock.awardLock(threadID, level); 371 } 372 373 376 public synchronized void cannotAwardLock(SessionID sessionID, LockID lockID, ThreadID threadID, int level) { 377 if (isPaused() || !sessionManager.isCurrentSession(sessionID)) { 378 logger.warn("Ignoring lock award from a dead server :" + sessionID + ", " + sessionManager + " : " + lockID + " " 379 + threadID + " level = " + level + " state = " + state); 380 return; 381 } 382 final ClientLock lock = (ClientLock) locksByID.get(lockID); 383 if (lock == null) { throw new AssertionError ("awardLock(): Lock not found" + lockID.toString() + " :: " + threadID 384 + " :: " + LockLevel.toString(level)); } 385 lock.cannotAwardLock(threadID, level); 386 } 387 388 private ClientLock getLock(LockID id) { 390 return (ClientLock) locksByID.get(id); 391 } 392 393 private synchronized ClientLock getOrCreateLock(LockID id) { 394 ClientLock lock = (ClientLock) locksByID.get(id); 395 if (lock == null) { 396 lock = new ClientLock(id, remoteLockManager, waitTimer); 397 locksByID.put(id, lock); 398 } 399 return lock; 400 } 401 402 public LockID lockIDFor(String id) { 403 if (id == null) return LockID.NULL_ID; 404 return new LockID(id); 405 } 406 407 public synchronized Collection addAllWaitersTo(Collection c) { 408 assertStarting(); 409 for (Iterator i = locksByID.values().iterator(); i.hasNext();) { 410 ClientLock lock = (ClientLock) i.next(); 411 lock.addAllWaitersTo(c); 412 } 413 return c; 414 } 415 416 public synchronized Collection addAllHeldLocksTo(Collection c) { 417 assertStarting(); 418 for (Iterator i = locksByID.values().iterator(); i.hasNext();) { 419 ClientLock lock = (ClientLock) i.next(); 420 lock.addHoldersToAsLockRequests(c); 421 } 422 return c; 423 } 424 425 public synchronized Collection addAllPendingLockRequestsTo(Collection c) { 426 assertStarting(); 427 for (Iterator i = locksByID.values().iterator(); i.hasNext();) { 428 ClientLock lock = (ClientLock) i.next(); 429 lock.addAllPendingLockRequestsTo(c); 430 } 431 return c; 432 } 433 434 synchronized boolean haveLock(LockID lockID, ThreadID threadID, int lockType) { 435 ClientLock l = (ClientLock) locksByID.get(lockID); 436 if (l == null) { return false; } 437 return l.isHeldBy(threadID, lockType); 438 } 439 440 private void waitUntilRunning() { 441 boolean isInterrupted = false; 442 while (!isRunning()) { 443 try { 444 wait(); 445 } catch (InterruptedException e) { 446 isInterrupted = true; 447 } 448 } 449 Util.selfInterruptIfNeeded(isInterrupted); 450 } 451 452 public synchronized boolean isRunning() { 453 return (state == RUNNING); 454 } 455 456 public synchronized boolean isPaused() { 457 return (state == PAUSED); 458 } 459 460 private void assertStarting() { 461 if (state != STARTING) throw new AssertionError ("ClientLockManager is not STARTING : " + state); 462 } 463 464 467 private synchronized Object addToPendingQueryLockRequest(LockID lockID, ThreadID threadID) { 468 Object o = new Object (); 470 QueryLockRequest qRequest = new QueryLockRequest(lockID, threadID, o); 471 Object old = pendingQueryLockRequestsByID.put(threadID, qRequest); 472 if (old != null) { 473 throw new AssertionError ("Query Lock request already outstanding - " + old); 475 } 476 477 return o; 478 } 479 480 private synchronized void resubmitQueryLockRequests() { 481 for (Iterator i=pendingQueryLockRequestsByID.values().iterator(); i.hasNext(); ) { 482 QueryLockRequest qRequest = (QueryLockRequest)i.next(); 483 remoteLockManager.queryLock(qRequest.lockID(), qRequest.threadID()); 484 } 485 } 486 487 private static String makeMissingLockText() { 488 ParagraphFormatter formatter = new ConsoleParagraphFormatter(72, new StringFormatter()); 489 490 String message = "An operation to a DSO lock was attempted for a lock that does not yet exist. This is usually the result "; 491 message += "of an object becoming shared in the middle of synchronized block on that object (in which case the monitorExit "; 492 message += "call will produce this exception). Additionally, attempts to wait()/notify()/notifyAll() on an object in such a block will "; 493 message += "also fail. To workaround this problem, the object/lock need to become shared in the scope of a different (earlier) "; 494 message += "synchronization block."; 495 496 return formatter.format(message); 497 } 498 499 static class LockGCTask extends TimerTask { 500 501 final ClientLockManager lockManager; 502 503 LockGCTask(ClientLockManager mgr) { 504 lockManager = mgr; 505 } 506 507 public void run() { 508 lockManager.runGC(); 509 } 510 } 511 } 512 | Popular Tags |