KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > lockmanager > impl > LockManagerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.lockmanager.impl;
6
7 import com.tc.async.api.Sink;
8 import com.tc.exception.ImplementMe;
9 import com.tc.logging.CustomerLogging;
10 import com.tc.logging.TCLogger;
11 import com.tc.logging.TCLogging;
12 import com.tc.net.protocol.tcm.ChannelID;
13 import com.tc.object.lockmanager.api.LockContext;
14 import com.tc.object.lockmanager.api.LockID;
15 import com.tc.object.lockmanager.api.LockLevel;
16 import com.tc.object.lockmanager.api.ThreadID;
17 import com.tc.object.lockmanager.api.WaitContext;
18 import com.tc.object.lockmanager.api.WaitTimer;
19 import com.tc.object.lockmanager.api.WaitTimerCallback;
20 import com.tc.object.lockmanager.impl.WaitTimerImpl;
21 import com.tc.object.net.DSOChannelManager;
22 import com.tc.object.tx.WaitInvocation;
23 import com.tc.objectserver.lockmanager.api.DeadlockChain;
24 import com.tc.objectserver.lockmanager.api.DeadlockResults;
25 import com.tc.objectserver.lockmanager.api.LockEventListener;
26 import com.tc.objectserver.lockmanager.api.LockMBean;
27 import com.tc.objectserver.lockmanager.api.LockManager;
28 import com.tc.objectserver.lockmanager.api.LockManagerMBean;
29 import com.tc.objectserver.lockmanager.api.LockWaitContext;
30 import com.tc.objectserver.lockmanager.api.NotifiedWaiters;
31 import com.tc.objectserver.lockmanager.api.TCIllegalMonitorStateException;
32 import com.tc.util.Assert;
33
34 import java.util.ArrayList JavaDoc;
35 import java.util.Collection JavaDoc;
36 import java.util.HashMap JavaDoc;
37 import java.util.HashSet JavaDoc;
38 import java.util.Iterator JavaDoc;
39 import java.util.List JavaDoc;
40 import java.util.Map JavaDoc;
41
42 /**
43  * Server representation of lock management. We will need to keep track of what locks are checkedout, who has the lock
44  * and who wants the lock
45  *
46  * @author steve
47  */

48 public class LockManagerImpl implements LockManager, LockManagerMBean, WaitTimerCallback {
49   private static final TCLogger logger = TCLogging
50                                                                                  .getLogger(LockManagerImpl.class);
51   private static final TCLogger clogger = CustomerLogging.getDSOGenericLogger();
52
53   public static final LockManagerErrorDescription NOT_STARTING_ERROR = new LockManagerErrorDescription(
54                                                                                                                "NOT STARTING");
55   public static final LockManagerErrorDescription NOT_STARTED_ERROR = new LockManagerErrorDescription(
56                                                                                                                "NOT STARTED");
57   public static final LockManagerErrorDescription IS_STARTING_ERROR = new LockManagerErrorDescription(
58                                                                                                                "IS STARTING");
59   public static final LockManagerErrorDescription IS_STOPPED_ERROR = new LockManagerErrorDescription(
60                                                                                                                "IS STOPPED");
61
62   public static final LockManagerErrorDescription LOCK_ALREADY_GRANTED_ERROR = new LockManagerErrorDescription(
63                                                                                                                "LOCK ALREADY GRANTED");
64
65   public static final int UNINITIALIZED_LOCK_POLICY = 0x00;
66   public static final int GREEDY_LOCK_POLICY = 0x01;
67   public static final int ALTRUISTIC_LOCK_POLICY = 0x02;
68
69   private static final State STARTING = new State("STARTING");
70   private static final State STARTED = new State("STARTED");
71   private static final State STOPPING = new State("STOPPING");
72   private static final State STOPPED = new State("STOPPED");
73
74   private State status = STARTING;
75   private final Map JavaDoc locks = new HashMap JavaDoc();
76   private final LockEventListener lockTimer;
77   private final DSOChannelManager channelManager;
78   private final LockEventListener[] lockListeners;
79   private final WaitTimer waitTimer;
80
81   // XXX: These lock timeout/policy needs to be configurable-- probably per lock...
82
private final long lockTimeout = 1000 * 60 * 2;
83   private int lockPolicy = UNINITIALIZED_LOCK_POLICY;
84   // private int lockPolicy = ALTRUISTIC_LOCK_POLICY;
85

86   private final List JavaDoc lockRequestQueue = new ArrayList JavaDoc();
87   private final ServerThreadContextFactory threadContextFactory = new ServerThreadContextFactory();
88
89   public LockManagerImpl(DSOChannelManager channelManager) {
90     this.channelManager = channelManager;
91
92     // Replacing real lock timer with a null lock timer until the OOP stuff is
93
// put in for real --Orion 2/24/2005
94
// this.lockTimer = new LockTimer(this.channelManager);
95
this.lockTimer = new NullLockTimer();
96     this.lockListeners = new LockEventListener[] { this.lockTimer };
97
98     // This could maybe be combined with the lock timeout stuff, but for now
99
// just use a dedicated Timer instance
100
this.waitTimer = new WaitTimerImpl();
101   }
102
103   public synchronized void dump() {
104     StringBuffer JavaDoc buf = new StringBuffer JavaDoc("LockManager");
105     buf.append("locks=" + locks).append("\n");
106     buf.append("/LockManager").append("\n");
107     System.err.println(buf.toString());
108   }
109
110   public synchronized int getLockCount() {
111     return this.locks.size();
112   }
113
114   public synchronized int getThreadContextCount() {
115     return this.threadContextFactory.getCount();
116   }
117
118   public synchronized void verify(ChannelID channelID, LockID[] lockIDs) {
119     if (!isStarted()) { return; }
120     for (int i = 0; i < lockIDs.length; i++) {
121       Lock lock = (Lock) locks.get(lockIDs[i]);
122       if (lock == null) {
123         String JavaDoc errorMsg = " Lock is not held for " + lockIDs[i] + ". Not by " + channelID
124                           + ". Not by anyone. uhm... Nada";
125         logger.warn(errorMsg);
126         throw new AssertionError JavaDoc(errorMsg);
127       }
128       if (!lock.holdsSomeLock(channelID)) { throw new AssertionError JavaDoc(" Lock " + lockIDs[i]
129                                                                      + " is not held by anyone in " + channelID); }
130     }
131   }
132
133   public synchronized void reestablishLock(LockID lockID, ChannelID channelID, ThreadID sourceID, int requestedLevel,
134                                            Sink lockResponseSink) {
135     assertStarting();
136     ServerThreadContext threadContext = threadContextFactory.getOrCreate(channelID, sourceID);
137     Lock lock = (Lock) this.locks.get(lockID);
138
139     if (lock == null) {
140       lock = new Lock(lockID, threadContext, this.lockTimeout, this.lockListeners, this.lockPolicy,
141                       threadContextFactory);
142       locks.put(lockID, lock);
143     }
144     lock.reestablishLock(threadContext, requestedLevel, lockResponseSink);
145     /*
146      * if (!basicRequestLock(lockID, channelID, threadID, requestedLevel, lockResponseSink)) { // formatter throw new
147      * LockManagerError(LOCK_ALREADY_GRANTED_ERROR, "Attempt to reestablish a lock failed. " + "Another client may have
148      * already reestablished this lock: " + "lockID=" + lockID + ", channelID=" + channelID + ", threadID=" + threadID + ",
149      * requestedLevel=" + requestedLevel); }
150      */

151   }
152
153   public synchronized boolean tryRequestLock(LockID lockID, ChannelID channelID, ThreadID sourceID, int requestedLevel,
154                                              Sink lockResponseSink) {
155     return requestLock(lockID, channelID, sourceID, requestedLevel, lockResponseSink, true);
156   }
157
158   private synchronized boolean requestLock(LockID lockID, ChannelID channelID, ThreadID threadID, int requestedLevel,
159                                            Sink lockResponseSink, boolean noBlock) {
160     if (!channelManager.isActiveID(channelID)) return false;
161     if (isStarting()) {
162       queueRequestLock(lockID, channelID, threadID, requestedLevel, lockResponseSink, noBlock);
163       return false;
164     }
165     if (!isStarted()) return false;
166     return basicRequestLock(lockID, channelID, threadID, requestedLevel, lockResponseSink, noBlock);
167   }
168
169   public synchronized boolean requestLock(LockID lockID, ChannelID channelID, ThreadID sourceID, int requestedLevel,
170                                           Sink lockResponseSink) {
171     return requestLock(lockID, channelID, sourceID, requestedLevel, lockResponseSink, false);
172   }
173
174   private boolean basicRequestLock(LockID lockID, ChannelID channelID, ThreadID threadID, int requestedLevel,
175                                    Sink lockResponseSink, boolean noBlock) {
176     ServerThreadContext threadContext = threadContextFactory.getOrCreate(channelID, threadID);
177     Lock lock = (Lock) this.locks.get(lockID);
178
179     if (lock != null) {
180       if (noBlock) {
181         return lock.tryRequestLock(threadContext, requestedLevel, lockResponseSink);
182       } else {
183         return lock.requestLock(threadContext, requestedLevel, lockResponseSink);
184       }
185     } else {
186       lock = new Lock(lockID, threadContext, requestedLevel, lockResponseSink, this.lockTimeout, this.lockListeners,
187                       this.lockPolicy, threadContextFactory);
188       locks.put(lockID, lock);
189       return true;
190     }
191   }
192
193   private void queueRequestLock(LockID lockID, ChannelID channelID, ThreadID threadID, int requestedLevel,
194                                 Sink lockResponseSink, boolean noBlock) {
195     lockRequestQueue
196         .add(new RequestLockContext(lockID, channelID, threadID, requestedLevel, lockResponseSink, noBlock));
197   }
198
199   public synchronized void queryLock(LockID lockID, ChannelID channelID, ThreadID threadID, Sink lockResponseSink) {
200     assertNotStarting();
201     if (!isStarted()) return;
202
203     Lock lock = getLockFor(lockID);
204     ServerThreadContext threadContext = threadContextFactory.getOrCreate(channelID, threadID);
205     lock.queryLock(threadContext, lockResponseSink);
206   }
207   
208   public synchronized void interrupt(LockID lockID, ChannelID channelID, ThreadID threadID) {
209     assertNotStarting();
210     if (!isStarted()) return;
211     
212     Lock lock = getLockFor(lockID);
213     ServerThreadContext threadContext = threadContextFactory.getOrCreate(channelID, threadID);
214     lock.interrupt(threadContext);
215   }
216
217   public synchronized void unlock(LockID id, ChannelID channelID, ThreadID threadID) {
218     assertNotStarting();
219     if (!isStarted()) return;
220
221     Lock l = getLockFor(id);
222     if (l.isNull()) {
223       logger.warn("An attempt was made to unlock:" + id + " for channelID:" + channelID
224                   + " This lock was not held. This could be do to that node being down so it may not be an error.");
225       return;
226     }
227     basicUnlock(l, threadContextFactory.getOrCreate(channelID, threadID));
228   }
229
230   public synchronized void wait(LockID lid, ChannelID cid, ThreadID tid, WaitInvocation call, Sink lockResponseSink) {
231     assertNotStopped();
232     Lock lock = (Lock) this.locks.get(lid);
233     if (lock != null) {
234       ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, tid);
235       try {
236         lock.wait(threadContext, waitTimer, call, this, lockResponseSink);
237         notifyAll();
238       } catch (TCIllegalMonitorStateException e) {
239         e.printStackTrace();
240         // XXX: send error response back to client
241
throw new ImplementMe();
242       }
243     } else {
244       // XXX: lock doesn't exist...this is bad ;-)
245
throw new ImplementMe();
246     }
247   }
248
249   public synchronized void reestablishWait(LockID lid, ChannelID cid, ThreadID tid, int lockLevel, WaitInvocation call,
250                                            Sink lockResponseSink) {
251     assertStarting();
252     Lock lock = (Lock) this.locks.get(lid);
253     ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, tid);
254     if (lock == null) {
255       lock = new Lock(lid, threadContext, this.lockTimeout, this.lockListeners, this.lockPolicy, threadContextFactory);
256       locks.put(lid, lock);
257     }
258     lock.reestablishWait(threadContext, call, lockLevel, lockResponseSink);
259   }
260
261   public synchronized void recallCommit(LockID lid, ChannelID cid, Collection JavaDoc lockContexts, Collection JavaDoc waitContexts,
262                                         Collection JavaDoc pendingLockContexts, Sink lockResponseSink) {
263     assertNotStarting();
264     Lock lock = (Lock) this.locks.get(lid);
265     Assert.assertNotNull(lock);
266
267     synchronized (lock) {
268       for (Iterator JavaDoc i = lockContexts.iterator(); i.hasNext();) {
269         LockContext ctxt = (LockContext) i.next();
270         ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, ctxt.getThreadID());
271         lock.addRecalledHolder(threadContext, ctxt.getLockLevel());
272       }
273
274       for (Iterator JavaDoc i = waitContexts.iterator(); i.hasNext();) {
275         WaitContext ctxt = (WaitContext) i.next();
276         ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, ctxt.getThreadID());
277         lock.addRecalledWaiter(threadContext, ctxt.getWaitInvocation(), ctxt.getLockLevel(), lockResponseSink,
278                                waitTimer, this);
279       }
280
281       for (Iterator JavaDoc i = pendingLockContexts.iterator(); i.hasNext();) {
282         LockContext ctxt = (LockContext) i.next();
283         ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, ctxt.getThreadID());
284         lock.addRecalledPendingRequest(threadContext, ctxt.getLockLevel(), lockResponseSink);
285       }
286
287       ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, ThreadID.VM_ID);
288       if (lock.recallCommit(threadContext)) {
289         locks.remove(lid);
290         threadContextFactory.removeIfClear(threadContext);
291       }
292     }
293   }
294
295   public synchronized void waitTimeout(Object JavaDoc callbackObject) {
296     if (isStarted() && callbackObject instanceof LockWaitContext) {
297       LockWaitContext context = (LockWaitContext) callbackObject;
298       context.waitTimeout();
299     } else {
300       logger.warn("Ignoring wait timeout for : " + callbackObject);
301     }
302   }
303
304   public synchronized void notify(LockID lid, ChannelID cid, ThreadID tid, boolean all,
305                                   NotifiedWaiters addNotifiedWaitersTo) {
306     // assertStarted();
307
if (!isStarted()) {
308       if (isStarting()) {
309         throw new AssertionError JavaDoc("Notify was called before the LockManager was started.");
310       } else {
311         logger.warn("Notify was called after shutdown sequence commenced.");
312       }
313     }
314     Lock lock = (Lock) this.locks.get(lid);
315     if (lock != null) {
316       ServerThreadContext threadContext = threadContextFactory.getOrCreate(cid, tid);
317       try {
318         lock.notify(threadContext, all, addNotifiedWaitersTo);
319         if (false) System.err.println("LockManager.notify(" + lid + ", " + cid + ", " + tid + ", all=" + all
320                                       + ", notifiedWaiters=" + addNotifiedWaitersTo);
321       } catch (TCIllegalMonitorStateException e) {
322         e.printStackTrace();
323         throw new AssertionError JavaDoc(e);
324       }
325     } else {
326       throw new AssertionError JavaDoc("Lock :" + lid + " is not present !");
327     }
328   }
329
330   private void basicUnlock(Lock lock, ServerThreadContext threadContext) {
331     boolean wasUpgrade = lock.removeCurrentHold(threadContext);
332     if (isStarted()) {
333       if (wasUpgrade) {
334         lock.awardAllReads();
335       } else {
336         boolean clear = lock.nextPending();
337         if (clear) {
338           locks.remove(lock.getLockID());
339         }
340       }
341     }
342     threadContextFactory.removeIfClear(threadContext);
343     notifyAll();
344   }
345
346   public synchronized boolean hasPending(LockID id) {
347     return getLockFor(id).hasPending();
348   }
349
350   public synchronized void clearAllLocksFor(ChannelID channelID) {
351     HashSet JavaDoc allLocks = new HashSet JavaDoc(locks.keySet());
352
353     // NOTE: These loops might be too expensive as lock and threadContext sets grow large
354
// We could keep indexes based on channel to speed things up
355

356     for (Iterator JavaDoc i = allLocks.iterator(); i.hasNext();) {
357       LockID lid = (LockID) i.next();
358       Lock lock = getLockFor(lid);
359
360       if (!lock.isNull()) {
361         lock.clearStateForChannel(channelID);
362
363         // This gets the next pending lock (if any) awarded
364
basicUnlock(lock, ServerThreadContext.NULL_CONTEXT);
365       }
366     }
367     threadContextFactory.clear(channelID);
368   }
369
370   private Lock getLockFor(LockID id) {
371     Lock lock = (Lock) locks.get(id);
372     if (lock == null) return Lock.NULL_LOCK;
373     return lock;
374   }
375
376   public synchronized void scanForDeadlocks(DeadlockResults output) {
377     new DeadlockDetector(output).detect(threadContextFactory.getView().iterator());
378   }
379
380   public DeadlockChain[] scanForDeadlocks() {
381     final List JavaDoc chains = new ArrayList JavaDoc();
382     DeadlockResults results = new DeadlockResults() {
383       public void foundDeadlock(DeadlockChain chain) {
384         chains.add(chain);
385       }
386     };
387
388     scanForDeadlocks(results);
389
390     return (DeadlockChain[]) chains.toArray(new DeadlockChain[chains.size()]);
391   }
392
393   public LockMBean[] getAllLocks() {
394     final List JavaDoc copy;
395     synchronized (this) {
396       copy = new ArrayList JavaDoc(locks.size());
397       copy.addAll(locks.values());
398     }
399
400     int count = 0;
401     LockMBean[] rv = new LockMBean[copy.size()];
402     for (Iterator JavaDoc i = copy.iterator(); i.hasNext();) {
403       Lock lock = (Lock) i.next();
404       rv[count++] = lock.getMBean(channelManager);
405     }
406
407     return rv;
408   }
409
410   public void start() {
411     synchronized (this) {
412       assertStarting();
413       changeState(STARTED);
414       if (lockPolicy == UNINITIALIZED_LOCK_POLICY) {
415         this.lockPolicy = GREEDY_LOCK_POLICY;
416       }
417
418       logger.debug("START Locks re-established -- " + locks.size());
419       for (Iterator JavaDoc i = locks.values().iterator(); i.hasNext();) {
420         Lock lock = (Lock) i.next();
421         lock.setLockPolicy(lockPolicy);
422         lock.notifyStarted(this, waitTimer);
423       }
424
425       for (Iterator JavaDoc i = lockRequestQueue.iterator(); i.hasNext();) {
426         RequestLockContext ctxt = (RequestLockContext) i.next();
427         requestLock(ctxt.lockID, ctxt.channelID, ctxt.threadID, ctxt.requestedLockLevel, ctxt.lockResponseSink,
428                     ctxt.noBlock);
429       }
430       lockRequestQueue.clear();
431     }
432   }
433
434   public synchronized void stop() throws InterruptedException JavaDoc {
435     while (isStarting())
436       wait();
437     assertStarted();
438     cinfo("Stopping...");
439     changeState(STOPPING);
440
441     locks.clear();
442     threadContextFactory.clear();
443
444     if (waitTimer != null) {
445       waitTimer.shutdown();
446     }
447     setLockPolicy(ALTRUISTIC_LOCK_POLICY);
448
449     changeState(STOPPED);
450     cinfo("Stopped.");
451
452   }
453
454   public int getLockPolicy() {
455     return lockPolicy;
456   }
457
458   public void setLockPolicy(int lockPolicy) {
459     Assert.assertTrue(lockPolicy == GREEDY_LOCK_POLICY || lockPolicy == ALTRUISTIC_LOCK_POLICY);
460     this.lockPolicy = lockPolicy;
461     for (Iterator JavaDoc i = locks.values().iterator(); i.hasNext();) {
462       Lock lock = (Lock) i.next();
463       lock.setLockPolicy(this.lockPolicy);
464     }
465   }
466
467   private void cinfo(Object JavaDoc message) {
468     clogger.debug("Lock Manager: " + message);
469   }
470
471   private void changeState(State s) {
472     this.status = s;
473     notifyAll();
474   }
475
476   private boolean isStopped() {
477     return status == STOPPED;
478   }
479
480   private boolean isStarted() {
481     return status == STARTED;
482   }
483
484   private boolean isStarting() {
485     return status == STARTING;
486   }
487
488   private void assertStarting() {
489     if (!isStarting()) throw new LockManagerError(NOT_STARTING_ERROR, "LockManager is not starting ("
490                                                                       + this.status.getName() + ")");
491   }
492
493   private void assertStarted() {
494     if (!isStarted()) throw new LockManagerError(NOT_STARTED_ERROR, "LockManager is not started ("
495                                                                     + this.status.getName() + ")");
496   }
497
498   private void assertNotStarting() {
499     if (isStarting()) throw new LockManagerError(IS_STARTING_ERROR, "LockManager is starting");
500   }
501
502   private void assertNotStopped() {
503     if (isStopped()) throw new LockManagerError(IS_STOPPED_ERROR, "LockManager is stopped");
504   }
505
506   public static class LockManagerError extends Error JavaDoc {
507
508     private final LockManagerErrorDescription desc;
509
510     private LockManagerError(LockManagerErrorDescription desc, String JavaDoc msg) {
511       super(msg);
512       this.desc = desc;
513     }
514
515     public LockManagerErrorDescription getDescription() {
516       return this.desc;
517     }
518   }
519
520   public static class LockManagerErrorDescription {
521     private final String JavaDoc name;
522
523     private LockManagerErrorDescription(String JavaDoc name) {
524       this.name = name;
525     }
526
527     public String JavaDoc toString() {
528       return getClass().getName() + "[" + this.name + "]";
529     }
530   }
531
532   private static class RequestLockContext {
533     final LockID lockID;
534     final ChannelID channelID;
535     final ThreadID threadID;
536     final int requestedLockLevel;
537     final boolean noBlock;
538     final Sink lockResponseSink;
539
540     private RequestLockContext(LockID lockID, ChannelID channelID, ThreadID threadID, int requestedLockLevel,
541                                Sink lockResponseSink, boolean noBlock) {
542       this.lockID = lockID;
543       this.channelID = channelID;
544       this.threadID = threadID;
545       this.requestedLockLevel = requestedLockLevel;
546       this.lockResponseSink = lockResponseSink;
547       this.noBlock = noBlock;
548     }
549
550     public String JavaDoc toString() {
551       return "RequestLockContext [ " + lockID + "," + channelID + "," + threadID + ","
552              + LockLevel.toString(requestedLockLevel) + ", " + noBlock + " ]";
553     }
554   }
555
556   private static class State {
557     private final String JavaDoc name;
558
559     private State(String JavaDoc name) {
560       this.name = name;
561     }
562
563     public String JavaDoc getName() {
564       return this.name;
565     }
566
567     public String JavaDoc toString() {
568       return getClass().getName() + "[" + this.name + "]";
569     }
570   }
571
572 }
573
Popular Tags