KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > lockmanager > impl > ClientLockManagerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object.lockmanager.impl;
6
7 import org.apache.commons.collections.map.ListOrderedMap;
8
9 import com.tc.logging.TCLogger;
10 import com.tc.object.lockmanager.api.ClientLockManager;
11 import com.tc.object.lockmanager.api.LockFlushCallback;
12 import com.tc.object.lockmanager.api.LockID;
13 import com.tc.object.lockmanager.api.LockLevel;
14 import com.tc.object.lockmanager.api.Notify;
15 import com.tc.object.lockmanager.api.QueryLockRequest;
16 import com.tc.object.lockmanager.api.RemoteLockManager;
17 import com.tc.object.lockmanager.api.ThreadID;
18 import com.tc.object.lockmanager.api.WaitListener;
19 import com.tc.object.lockmanager.api.WaitTimer;
20 import com.tc.object.session.SessionID;
21 import com.tc.object.session.SessionManager;
22 import com.tc.object.tx.WaitInvocation;
23 import com.tc.text.ConsoleParagraphFormatter;
24 import com.tc.text.ParagraphFormatter;
25 import com.tc.text.StringFormatter;
26 import com.tc.util.Assert;
27 import com.tc.util.State;
28 import com.tc.util.Util;
29
30 import java.util.ArrayList JavaDoc;
31 import java.util.Collection JavaDoc;
32 import java.util.HashMap JavaDoc;
33 import java.util.HashSet JavaDoc;
34 import java.util.Iterator JavaDoc;
35 import java.util.Map JavaDoc;
36 import java.util.TimerTask JavaDoc;
37
38 /**
39  * @author steve
40  */

41 public class ClientLockManagerImpl implements ClientLockManager, LockFlushCallback {
42
43   public static final long TIMEOUT = 60 * 1000;
44
45   private static final State RUNNING = new State("RUNNING");
46   private static final State STARTING = new State("STARTING");
47   private static final State PAUSED = new State("PAUSED");
48
49   private static final String JavaDoc MISSING_LOCK_TEXT = makeMissingLockText();
50
51   private State state = RUNNING;
52   private final Map locksByID = new HashMap JavaDoc();
53   private final Map pendingQueryLockRequestsByID = new ListOrderedMap();
54   private final Map lockInfoByID = new HashMap JavaDoc();
55   private final RemoteLockManager remoteLockManager;
56   private final WaitTimer waitTimer = new WaitTimerImpl();
57   private final TCLogger logger;
58   private final SessionManager sessionManager;
59
60   public ClientLockManagerImpl(TCLogger logger, RemoteLockManager remoteLockManager, SessionManager sessionManager) {
61     this.logger = logger;
62     this.remoteLockManager = remoteLockManager;
63     this.sessionManager = sessionManager;
64     waitTimer.getTimer().schedule(new LockGCTask(this), TIMEOUT, TIMEOUT);
65   }
66
67   public synchronized void pause() {
68     if (state == PAUSED) throw new AssertionError JavaDoc("Attempt to pause while already paused : " + state);
69     this.state = PAUSED;
70     for (Iterator JavaDoc iter = new HashSet JavaDoc(locksByID.values()).iterator(); iter.hasNext();) {
71       ClientLock lock = (ClientLock) iter.next();
72       lock.pause();
73     }
74   }
75
76   public synchronized void starting() {
77     if (state != PAUSED) throw new AssertionError JavaDoc("Attempt to start when not paused: " + state);
78     this.state = STARTING;
79   }
80
81   public synchronized void unpause() {
82     if (state != STARTING) throw new AssertionError JavaDoc("Attempt to unpause when not starting: " + state);
83     this.state = RUNNING;
84     notifyAll();
85     for (Iterator JavaDoc iter = locksByID.values().iterator(); iter.hasNext();) {
86       ClientLock lock = (ClientLock) iter.next();
87       lock.unpause();
88     }
89     resubmitQueryLockRequests();
90   }
91
92   public synchronized boolean isStarting() {
93     return state == STARTING;
94   }
95
96   public synchronized void runGC() {
97     waitUntilRunning();
98     logger.info("Running Lock GC...");
99     ArrayList JavaDoc toGC = new ArrayList JavaDoc(locksByID.size());
100     for (Iterator JavaDoc iter = locksByID.values().iterator(); iter.hasNext();) {
101       ClientLock lock = (ClientLock) iter.next();
102       if (lock.timedout()) {
103         toGC.add(lock.getLockID());
104       }
105     }
106     if (toGC.size() > 0) {
107       logger.debug("GCing " + (toGC.size() < 11 ? toGC.toString() : toGC.size() + " Locks ..."));
108       for (Iterator JavaDoc iter = toGC.iterator(); iter.hasNext();) {
109         LockID lockID = (LockID) iter.next();
110         recall(lockID, ThreadID.VM_ID, LockLevel.WRITE);
111       }
112     }
113   }
114
115   private GlobalLockInfo getLockInfo(LockID lockID, ThreadID threadID) {
116     Object JavaDoc waitLock = addToPendingQueryLockRequest(lockID, threadID);
117     remoteLockManager.queryLock(lockID, threadID);
118     waitForQueryReply(threadID, waitLock);
119     GlobalLockInfo lockInfo;
120     synchronized (lockInfoByID) {
121       lockInfo = (GlobalLockInfo) lockInfoByID.remove(threadID);
122     }
123     return lockInfo;
124   }
125
126   // TODO:
127
// Needs to take care of the greedy lock case.
128
public int queueLength(LockID lockID, ThreadID threadID) {
129     ClientLock lock;
130     synchronized (this) {
131       waitUntilRunning();
132       lock = getLock(lockID);
133     }
134     GlobalLockInfo lockInfo = getLockInfo(lockID, threadID);
135
136     int queueLength = lockInfo.getLockRequestQueueLength() + lockInfo.getLockUpgradeQueueLength();
137     if (lock != null) {
138       queueLength += lock.queueLength();
139     }
140     return queueLength;
141   }
142
143   // TODO:
144
// Needs to take care of the greedy lock case.
145
public int waitLength(LockID lockID, ThreadID threadID) {
146     ClientLock lock;
147     synchronized (this) {
148       waitUntilRunning();
149       lock = getLock(lockID);
150     }
151
152     GlobalLockInfo lockInfo = getLockInfo(lockID, threadID);
153     int waitLength = lockInfo.getWaitersInfo().size();
154
155     if (lock != null) {
156       return waitLength + lock.waitLength();
157     }
158
159     return waitLength;
160   }
161
162   public int localHeldCount(LockID lockID, int lockLevel, ThreadID threadID) {
163     ClientLock lock;
164     synchronized (this) {
165       waitUntilRunning();
166       lock = (ClientLock) locksByID.get(lockID);
167     }
168     if (lock == null) {
169       return 0;
170     } else {
171       return lock.localHeldCount(threadID, lockLevel);
172     }
173   }
174
175   // TODO:
176
// Needs to take care of the greedy lock case.
177
public boolean isLocked(LockID lockID, ThreadID threadID) {
178     ClientLock lock;
179     synchronized (this) {
180       waitUntilRunning();
181       lock = (ClientLock) locksByID.get(lockID);
182     }
183     if (lock != null) {
184       return lock.isHeld();
185     } else {
186       GlobalLockInfo lockInfo = getLockInfo(lockID, threadID);
187       return lockInfo.isLocked();
188     }
189   }
190
191   private void waitForQueryReply(ThreadID threadID, Object JavaDoc waitLock) {
192     boolean isInterrupted = false;
193
194     synchronized (waitLock) {
195       while (!hasLockInfo(threadID)) {
196         try {
197           waitLock.wait();
198         } catch (InterruptedException JavaDoc ioe) {
199           isInterrupted = true;
200         }
201       }
202     }
203     Util.selfInterruptIfNeeded(isInterrupted);
204   }
205
206   private boolean hasLockInfo(ThreadID threadID) {
207     synchronized (lockInfoByID) {
208       return lockInfoByID.containsKey(threadID);
209     }
210   }
211
212   public void lock(LockID lockID, ThreadID threadID, int type) {
213     Assert.assertNotNull("threadID", threadID);
214     final ClientLock lock;
215
216     synchronized (this) {
217       waitUntilRunning();
218       lock = getOrCreateLock(lockID);
219       lock.incUseCount();
220     }
221     lock.lock(threadID, type);
222   }
223
224   public boolean tryLock(LockID lockID, ThreadID threadID, int type) {
225     Assert.assertNotNull("threadID", threadID);
226     final ClientLock lock;
227
228     synchronized (this) {
229       waitUntilRunning();
230       lock = getOrCreateLock(lockID);
231       lock.incUseCount();
232     }
233     boolean isLocked = lock.tryLock(threadID, type);
234     if (!isLocked) {
235       synchronized (this) {
236         lock.decUseCount();
237       }
238       cleanUp(lock);
239     }
240     return isLocked;
241   }
242
243   public void unlock(LockID lockID, ThreadID threadID) {
244     final ClientLock myLock;
245
246     synchronized (this) {
247       waitUntilRunning();
248       myLock = (ClientLock) locksByID.get(lockID);
249       if (myLock == null) { throw missingLockException(lockID); }
250       myLock.decUseCount();
251     }
252
253     myLock.unlock(threadID);
254     cleanUp(myLock);
255   }
256
257   private AssertionError JavaDoc missingLockException(LockID lockID) {
258     return new AssertionError JavaDoc(MISSING_LOCK_TEXT + " Missing lock ID is " + lockID);
259   }
260
261   public void wait(LockID lockID, ThreadID threadID, WaitInvocation call, Object JavaDoc waitLock, WaitListener listener)
262       throws InterruptedException JavaDoc {
263     final ClientLock myLock;
264     synchronized (this) {
265       waitUntilRunning();
266       myLock = (ClientLock) locksByID.get(lockID);
267     }
268     if (myLock == null) { throw missingLockException(lockID); }
269     myLock.wait(threadID, call, waitLock, listener);
270   }
271
272   public Notify notify(LockID lockID, ThreadID threadID, boolean all) {
273     final ClientLock myLock;
274     synchronized (this) {
275       waitUntilRunning();
276       myLock = (ClientLock) locksByID.get(lockID);
277     }
278     if (myLock == null) { throw missingLockException(lockID); }
279     return myLock.notify(threadID, all);
280   }
281
282   /*
283    * The level represents the reason why server wanted a recall and will determite when a recall commit will happen.
284    */

285   public synchronized void recall(LockID lockID, ThreadID threadID, int interestedLevel) {
286     Assert.assertEquals(ThreadID.VM_ID, threadID);
287     if (isPaused()) {
288       logger.warn("Ignoring recall request from dead server : " + lockID + ", " + threadID + " interestedLevel : "
289                   + LockLevel.toString(interestedLevel));
290       return;
291     }
292     final ClientLock myLock = (ClientLock) locksByID.get(lockID);
293     if (myLock != null) {
294       myLock.recall(interestedLevel, this);
295       cleanUp(myLock);
296     }
297   }
298
299   public void transactionsForLockFlushed(LockID lockID) {
300     final ClientLock myLock;
301     synchronized (this) {
302       waitUntilRunning();
303       myLock = (ClientLock) locksByID.get(lockID);
304     }
305     if (myLock != null) {
306       myLock.transactionsForLockFlushed(lockID);
307       cleanUp(myLock);
308     }
309   }
310
311   /*
312    * Called from a stage thread and should never be blocked XXX:: I am currently not ignoring reponses from dead server
313    * because of a bug during server restart case. check out https://jira.terracotta.org/jira/browse/DEV-448 . After
314    * fixing that, one can ignore responses while in paused state.
315    */

316   public synchronized void queryLockCommit(ThreadID threadID, GlobalLockInfo globalLockInfo) {
317     synchronized (lockInfoByID) {
318       lockInfoByID.put(threadID, globalLockInfo);
319     }
320     QueryLockRequest qRequest = (QueryLockRequest)pendingQueryLockRequestsByID.remove(threadID);
321     if (qRequest == null) { throw new AssertionError JavaDoc("Query Lock request does not exist."); }
322     Object JavaDoc waitLock = qRequest.getWaitLock();
323     synchronized (waitLock) {
324       waitLock.notifyAll();
325     }
326   }
327
328   public synchronized void waitTimedOut(LockID lockID, ThreadID threadID) {
329     notified(lockID, threadID);
330   }
331
332   private synchronized void cleanUp(ClientLock lock) {
333     if (lock.isClear()) {
334       Object JavaDoc o = locksByID.get(lock.getLockID());
335       if (o == lock) {
336         // Sometimes when called from recall, the unlock would have already removed this lock
337
// from the map and a new lock could be in the map from a new lock request. We dont want to
338
// remove that
339
locksByID.remove(lock.getLockID());
340       }
341     }
342   }
343
344   /*
345    * Called from a stage thread and should never be blocked
346    */

347   public synchronized void notified(LockID lockID, ThreadID threadID) {
348     if (isPaused()) {
349       logger.warn("Ignoring notified call from dead server : " + lockID + ", " + threadID);
350       return;
351     }
352     final ClientLock myLock = (ClientLock) locksByID.get(lockID);
353     if (myLock == null) { throw new AssertionError JavaDoc(lockID.toString()); }
354     myLock.notified(threadID);
355   }
356
357   /*
358    * XXX::This method is called from a stage thread. It operate on the lock inside the scope of the synchronization
359    * unlike other methods because, we want to decide whether to process this award or not and go with it atomically
360    */

361   public synchronized void awardLock(SessionID sessionID, LockID lockID, ThreadID threadID, int level) {
362     if (isPaused() || !sessionManager.isCurrentSession(sessionID)) {
363       logger.warn("Ignoring lock award from a dead server :" + sessionID + ", " + sessionManager + " : " + lockID + " "
364                   + threadID + " " + LockLevel.toString(level) + " state = " + state);
365       return;
366     }
367     final ClientLock lock = (ClientLock) locksByID.get(lockID);
368     if (lock == null) { throw new AssertionError JavaDoc("awardLock(): Lock not found" + lockID.toString() + " :: " + threadID
369                                                  + " :: " + LockLevel.toString(level)); }
370     lock.awardLock(threadID, level);
371   }
372
373   /*
374    * XXX:: @read comment for awardLock();
375    */

376   public synchronized void cannotAwardLock(SessionID sessionID, LockID lockID, ThreadID threadID, int level) {
377     if (isPaused() || !sessionManager.isCurrentSession(sessionID)) {
378       logger.warn("Ignoring lock award from a dead server :" + sessionID + ", " + sessionManager + " : " + lockID + " "
379                   + threadID + " level = " + level + " state = " + state);
380       return;
381     }
382     final ClientLock lock = (ClientLock) locksByID.get(lockID);
383     if (lock == null) { throw new AssertionError JavaDoc("awardLock(): Lock not found" + lockID.toString() + " :: " + threadID
384                                                  + " :: " + LockLevel.toString(level)); }
385     lock.cannotAwardLock(threadID, level);
386   }
387
388   // This method should be called within a synchronized(this) block.
389
private ClientLock getLock(LockID id) {
390     return (ClientLock) locksByID.get(id);
391   }
392
393   private synchronized ClientLock getOrCreateLock(LockID id) {
394     ClientLock lock = (ClientLock) locksByID.get(id);
395     if (lock == null) {
396       lock = new ClientLock(id, remoteLockManager, waitTimer);
397       locksByID.put(id, lock);
398     }
399     return lock;
400   }
401
402   public LockID lockIDFor(String JavaDoc id) {
403     if (id == null) return LockID.NULL_ID;
404     return new LockID(id);
405   }
406
407   public synchronized Collection JavaDoc addAllWaitersTo(Collection JavaDoc c) {
408     assertStarting();
409     for (Iterator JavaDoc i = locksByID.values().iterator(); i.hasNext();) {
410       ClientLock lock = (ClientLock) i.next();
411       lock.addAllWaitersTo(c);
412     }
413     return c;
414   }
415
416   public synchronized Collection JavaDoc addAllHeldLocksTo(Collection JavaDoc c) {
417     assertStarting();
418     for (Iterator JavaDoc i = locksByID.values().iterator(); i.hasNext();) {
419       ClientLock lock = (ClientLock) i.next();
420       lock.addHoldersToAsLockRequests(c);
421     }
422     return c;
423   }
424
425   public synchronized Collection JavaDoc addAllPendingLockRequestsTo(Collection JavaDoc c) {
426     assertStarting();
427     for (Iterator JavaDoc i = locksByID.values().iterator(); i.hasNext();) {
428       ClientLock lock = (ClientLock) i.next();
429       lock.addAllPendingLockRequestsTo(c);
430     }
431     return c;
432   }
433
434   synchronized boolean haveLock(LockID lockID, ThreadID threadID, int lockType) {
435     ClientLock l = (ClientLock) locksByID.get(lockID);
436     if (l == null) { return false; }
437     return l.isHeldBy(threadID, lockType);
438   }
439
440   private void waitUntilRunning() {
441     boolean isInterrupted = false;
442     while (!isRunning()) {
443       try {
444         wait();
445       } catch (InterruptedException JavaDoc e) {
446         isInterrupted = true;
447       }
448     }
449     Util.selfInterruptIfNeeded(isInterrupted);
450   }
451
452   public synchronized boolean isRunning() {
453     return (state == RUNNING);
454   }
455
456   public synchronized boolean isPaused() {
457     return (state == PAUSED);
458   }
459
460   private void assertStarting() {
461     if (state != STARTING) throw new AssertionError JavaDoc("ClientLockManager is not STARTING : " + state);
462   }
463
464   /*
465    * @returns the wait object for lock request
466    */

467   private synchronized Object JavaDoc addToPendingQueryLockRequest(LockID lockID, ThreadID threadID) {
468     // Add Lock Request
469
Object JavaDoc o = new Object JavaDoc();
470     QueryLockRequest qRequest = new QueryLockRequest(lockID, threadID, o);
471     Object JavaDoc old = pendingQueryLockRequestsByID.put(threadID, qRequest);
472     if (old != null) {
473       // formatting
474
throw new AssertionError JavaDoc("Query Lock request already outstanding - " + old);
475     }
476
477     return o;
478   }
479   
480   private synchronized void resubmitQueryLockRequests() {
481     for (Iterator JavaDoc i=pendingQueryLockRequestsByID.values().iterator(); i.hasNext(); ) {
482       QueryLockRequest qRequest = (QueryLockRequest)i.next();
483       remoteLockManager.queryLock(qRequest.lockID(), qRequest.threadID());
484     }
485   }
486
487   private static String JavaDoc makeMissingLockText() {
488     ParagraphFormatter formatter = new ConsoleParagraphFormatter(72, new StringFormatter());
489
490     String JavaDoc message = "An operation to a DSO lock was attempted for a lock that does not yet exist. This is usually the result ";
491     message += "of an object becoming shared in the middle of synchronized block on that object (in which case the monitorExit ";
492     message += "call will produce this exception). Additionally, attempts to wait()/notify()/notifyAll() on an object in such a block will ";
493     message += "also fail. To workaround this problem, the object/lock need to become shared in the scope of a different (earlier) ";
494     message += "synchronization block.";
495
496     return formatter.format(message);
497   }
498
499   static class LockGCTask extends TimerTask JavaDoc {
500
501     final ClientLockManager lockManager;
502
503     LockGCTask(ClientLockManager mgr) {
504       lockManager = mgr;
505     }
506
507     public void run() {
508       lockManager.runGC();
509     }
510   }
511 }
512
Popular Tags