KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > lockmanager > impl > Lock


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.objectserver.lockmanager.impl;
6
7 import org.apache.commons.collections.map.ListOrderedMap;
8
9 import com.tc.async.api.Sink;
10 import com.tc.exception.TCInternalError;
11 import com.tc.logging.TCLogger;
12 import com.tc.logging.TCLogging;
13 import com.tc.net.protocol.tcm.ChannelID;
14 import com.tc.object.lockmanager.api.LockContext;
15 import com.tc.object.lockmanager.api.LockID;
16 import com.tc.object.lockmanager.api.LockLevel;
17 import com.tc.object.lockmanager.api.ServerThreadID;
18 import com.tc.object.lockmanager.api.ThreadID;
19 import com.tc.object.lockmanager.api.WaitTimer;
20 import com.tc.object.lockmanager.api.WaitTimerCallback;
21 import com.tc.object.net.DSOChannelManager;
22 import com.tc.object.tx.WaitInvocation;
23 import com.tc.objectserver.context.LockResponseContext;
24 import com.tc.objectserver.lockmanager.api.LockEventListener;
25 import com.tc.objectserver.lockmanager.api.LockHolder;
26 import com.tc.objectserver.lockmanager.api.LockMBean;
27 import com.tc.objectserver.lockmanager.api.LockWaitContext;
28 import com.tc.objectserver.lockmanager.api.NotifiedWaiters;
29 import com.tc.objectserver.lockmanager.api.ServerLockRequest;
30 import com.tc.objectserver.lockmanager.api.TCIllegalMonitorStateException;
31 import com.tc.objectserver.lockmanager.api.Waiter;
32 import com.tc.util.Assert;
33
34 import java.util.ArrayList JavaDoc;
35 import java.util.Collection JavaDoc;
36 import java.util.Collections JavaDoc;
37 import java.util.HashMap JavaDoc;
38 import java.util.Iterator JavaDoc;
39 import java.util.LinkedList JavaDoc;
40 import java.util.List JavaDoc;
41 import java.util.Map JavaDoc;
42 import java.util.TimerTask JavaDoc;
43
44 public class Lock {
45   private static final TCLogger logger = TCLogging.getLogger(Lock.class);
46   public final static Lock NULL_LOCK = new Lock(LockID.NULL_ID, 0,
47                                                                           new LockEventListener[] {}, true,
48                                                                           LockManagerImpl.ALTRUISTIC_LOCK_POLICY,
49                                                                           ServerThreadContextFactory.DEFAULT_FACTORY);
50
51   private static final int UPGRADE = LockLevel.READ | LockLevel.WRITE;
52
53   private final LockEventListener[] listeners;
54   private final Map greedyHolders = new HashMap JavaDoc();
55   private final Map holders = new HashMap JavaDoc();
56   private final List JavaDoc pendingLockRequests = new LinkedList JavaDoc();
57   private final List JavaDoc pendingLockUpgrades = new LinkedList JavaDoc();
58   private final ListOrderedMap waiters = new ListOrderedMap();
59   private final Map waitTimers = new HashMap JavaDoc();
60   private final LockID lockID;
61   private final long timeout;
62   private final boolean isNull;
63   private int level;
64   private boolean recalled = false;
65
66   private int lockPolicy;
67   private final ServerThreadContextFactory threadContextFactory;
68
69   // real constructor used by lock manager
70
Lock(LockID lockID, ServerThreadContext txn, int lockLevel, Sink lockResponseSink, long timeout,
71        LockEventListener[] listeners, int lockPolicy, ServerThreadContextFactory threadContextFactory) {
72     this(lockID, timeout, listeners, false, lockPolicy, threadContextFactory);
73     requestLock(txn, lockLevel, lockResponseSink);
74   }
75
76   // real constructor used by lock manager when re-establishing waits and lock holds on
77
// restart.
78
Lock(LockID lockID, ServerThreadContext txn, long timeout, LockEventListener[] listeners, int lockPolicy,
79        ServerThreadContextFactory threadContextFactory) {
80     this(lockID, timeout, listeners, false, lockPolicy, threadContextFactory);
81   }
82
83   // for tests
84
Lock(LockID lockID, long timeout, LockEventListener[] listeners) {
85     this(lockID, timeout, listeners, false, LockManagerImpl.ALTRUISTIC_LOCK_POLICY,
86          ServerThreadContextFactory.DEFAULT_FACTORY);
87   }
88
89   private Lock(LockID lockID, long timeout, LockEventListener[] listeners, boolean isNull, int lockPolicy,
90                ServerThreadContextFactory threadContextFactory) {
91     this.lockID = lockID;
92     this.listeners = listeners;
93     this.timeout = timeout;
94     this.isNull = isNull;
95     this.lockPolicy = lockPolicy;
96     this.threadContextFactory = threadContextFactory;
97   }
98
99   static LockResponseContext createLockRejectedResponseContext(LockID lockID, ServerThreadID threadID, int level) {
100     return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level,
101                                    LockResponseContext.LOCK_NOT_AWARDED);
102   }
103
104   static LockResponseContext createLockAwardResponseContext(LockID lockID, ServerThreadID threadID, int level) {
105     return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level,
106                                    LockResponseContext.LOCK_AWARD);
107   }
108
109   static LockResponseContext createLockRecallResponseContext(LockID lockID, ServerThreadID threadID, int level) {
110     return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level,
111                                    LockResponseContext.LOCK_RECALL);
112   }
113
114   static LockResponseContext createLockWaitTimeoutResponseContext(LockID lockID, ServerThreadID threadID, int level) {
115     return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level,
116                                    LockResponseContext.LOCK_WAIT_TIMEOUT);
117   }
118
119   static LockResponseContext createLockQueriedResponseContext(LockID lockID, ServerThreadID threadID, int level,
120                                                               int lockRequestQueueLength, int lockUpgradeQueueLength,
121                                                               Collection JavaDoc greedyHolders, Collection JavaDoc holders,
122                                                               Collection JavaDoc waiters) {
123     return new LockResponseContext(lockID, threadID.getChannelID(), threadID.getClientThreadID(), level,
124                                    lockRequestQueueLength, lockUpgradeQueueLength, greedyHolders, holders, waiters,
125                                    LockResponseContext.LOCK_INFO);
126   }
127
128   synchronized LockMBean getMBean(DSOChannelManager channelManager) {
129     int count;
130     LockHolder[] holds = new LockHolder[this.holders.size()];
131     ServerLockRequest[] reqs = new ServerLockRequest[this.pendingLockRequests.size()];
132     ServerLockRequest[] upgrades = new ServerLockRequest[this.pendingLockUpgrades.size()];
133     Waiter[] waits = new Waiter[this.waiters.size()];
134
135     count = 0;
136     for (Iterator JavaDoc i = this.holders.values().iterator(); i.hasNext();) {
137       Holder h = (Holder) i.next();
138       ChannelID cid = h.getChannelID();
139       holds[count++] = new LockHolder(cid, channelManager.getChannelAddress(cid), h.getThreadID(), h.getLockLevel(), h
140           .getTimestamp());
141     }
142
143     count = 0;
144     for (Iterator JavaDoc i = this.pendingLockRequests.iterator(); i.hasNext();) {
145       Request r = (Request) i.next();
146       ChannelID cid = r.getRequesterID();
147       reqs[count++] = new ServerLockRequest(cid, channelManager.getChannelAddress(cid), r.getSourceID(), r
148           .getLockLevel(), r.getTimestamp());
149     }
150
151     count = 0;
152     for (Iterator JavaDoc i = this.pendingLockUpgrades.iterator(); i.hasNext();) {
153       Request r = (Request) i.next();
154       ChannelID cid = r.getRequesterID();
155       upgrades[count++] = new ServerLockRequest(cid, channelManager.getChannelAddress(cid), r.getSourceID(), r
156           .getLockLevel(), r.getTimestamp());
157     }
158
159     count = 0;
160     for (Iterator JavaDoc i = this.waiters.values().iterator(); i.hasNext();) {
161       LockWaitContext wc = (LockWaitContext) i.next();
162       ChannelID cid = wc.getChannelID();
163       waits[count++] = new Waiter(cid, channelManager.getChannelAddress(cid), wc.getThreadID(), wc.getWaitInvocation(),
164                                   wc.getTimestamp());
165     }
166
167     return new LockMBeanImpl(lockID, holds, reqs, upgrades, waits);
168   }
169
170   synchronized void queryLock(ServerThreadContext txn, Sink lockResponseSink) {
171     if (!hasGreedyHolders()) {
172       lockResponseSink.add(createLockQueriedResponseContext(this.lockID, txn.getId(), this.level,
173                                                             this.pendingLockRequests.size(), this.pendingLockUpgrades
174                                                                 .size(), this.greedyHolders.values(), this.holders
175                                                                 .values(), this.waiters.values()));
176     } else {
177       // TODO:
178
// The Remote Lock Manager needs to ask the client for lock information when greedy lock is awarded.
179
// Currently, the Remote Lock Manager responds to queryLock by looking at the server only.
180
lockResponseSink.add(createLockQueriedResponseContext(this.lockID, txn.getId(), this.level,
181                                                             this.pendingLockRequests.size(), this.pendingLockUpgrades
182                                                                 .size(), this.greedyHolders.values(), this.holders
183                                                                 .values(), this.waiters.values()));
184     }
185   }
186
187   synchronized boolean tryRequestLock(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) {
188     return requestLock(txn, requestedLockLevel, lockResponseSink, true);
189   }
190
191   synchronized boolean requestLock(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) {
192     return requestLock(txn, requestedLockLevel, lockResponseSink, false);
193   }
194
195   // XXX:: UPGRADE Requests can come in with requestLockLevel == UPGRADE on a notified wait during server crash
196
private synchronized boolean requestLock(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink,
197                                            boolean noBlock) {
198
199     // debug("requestLock - BEGIN -", txn, ",", LockLevel.toString(requestedLockLevel));
200
// it is an error (probably originating from the client side) to
201
// request a lock you already hold
202
Holder holder = getHolder(txn);
203     if (noBlock && holder == null && (getHoldersCount() > 0 || hasGreedyHolders())) {
204       cannotAwardAndRespond(txn, requestedLockLevel, lockResponseSink);
205       return false;
206     }
207
208     if (holder != null) {
209       if (LockLevel.NIL_LOCK_LEVEL != (holder.getLockLevel() & requestedLockLevel)) {
210         // formatting
211
throw new AssertionError JavaDoc("Client requesting already held lock! holder=" + holder + ", lock=" + this);
212       }
213     }
214     if (waiters.containsKey(txn)) throw new AssertionError JavaDoc("Attempt to request a lock in a Thread "
215                                                            + "that is already part of the wait set. lock = " + this);
216
217     if (isPolicyGreedy()) {
218       if (canAwardGreedilyOnTheClient(txn, requestedLockLevel)) {
219         // These requests are the ones in the wire when the greedy lock was given out to the client.
220
// We can safely ignore it as the clients will be able to award it locally.
221
logger.debug(lockID + " : Lock.requestLock() : Ignoring the Lock request(" + txn + ","
222                      + LockLevel.toString(requestedLockLevel)
223                      + ") message from the a client that has the lock greedily.");
224         return false;
225       } else if (recalled) {
226         // add to pending until recall process is complete, those who hold the lock greedily will send the
227
// pending state during recall commit.
228
if (!holdsGreedyLock(txn)) {
229           addPending(txn, requestedLockLevel, lockResponseSink);
230         }
231         return false;
232       }
233     }
234
235     // Lock granting logic:
236
// 0. If no one is holding this lock, go ahead and award it
237
// 1. If only a read lock is held and no write locks are pending, and another read
238
// (and only read) lock is requested, award it. If Write locks are pending, we dont want to
239
// starve the WRITES by keeping on awarding READ Locks.
240
// 2. If there is only one holder, that hold is a read lock, the holder is
241
// the requestor, and the requestor wants a write lock...then this is an
242
// awardable lock upgrade
243
// 3. Else the request must be queued (ie. added to pending list)
244

245     if ((getHoldersCount() == 0) || ((!hasPending()) && ((requestedLockLevel == LockLevel.READ) && this.isRead()))) {
246       // (0, 1) uncontended or additional read lock
247
if (isPolicyGreedy() && (requestedLockLevel != UPGRADE)
248           && ((requestedLockLevel == LockLevel.READ) || (getWaiterCount() == 0))) {
249         awardGreedyAndRespond(txn, requestedLockLevel, lockResponseSink);
250       } else {
251         awardAndRespond(txn, requestedLockLevel, lockResponseSink);
252       }
253     } else if ((getHoldersCount() == 1) && holdsReadLock(txn) && LockLevel.isWrite(requestedLockLevel)) {
254       // (2) allowed lock upgrade
255
if (isPolicyGreedy() && isGreedyRequest(txn)) {
256         // XXX::Currently Greedy upgrades are not supported. Client never does a greedy request.
257
requestedLockLevel = LockLevel.makeGreedy(requestedLockLevel);
258       }
259       awardAndRespond(txn, requestedLockLevel, lockResponseSink);
260     } else {
261       // (3) queue request
262
if (isPolicyGreedy() && hasGreedyHolders()) {
263         recall(requestedLockLevel);
264       }
265       if (!holdsGreedyLock(txn)) {
266         addPending(txn, requestedLockLevel, lockResponseSink);
267       }
268       return false;
269     }
270
271     return true;
272   }
273
274   synchronized void addRecalledHolder(ServerThreadContext txn, int lockLevel) {
275     // debug("addRecalledHolder - BEGIN -", txn, ",", LockLevel.toString(lockLevel));
276
if (!LockLevel.isWrite(level) && LockLevel.isWrite(lockLevel)) {
277       // Client issued a WRITE lock without holding a GREEDY WRITE. Bug in the client.
278
throw new AssertionError JavaDoc("Client issued a WRITE lock without holding a GREEDY WRITE !");
279     }
280     awardLock(txn, lockLevel);
281     if (LockLevel.isRead(lockLevel) && pendingLockRequests.size() > 0) {
282       // Check to see if we have any lock request for this Thread that needs to go to lock upgrade
283
for (Iterator JavaDoc iter = pendingLockRequests.iterator(); iter.hasNext();) {
284         Request request = (Request) iter.next();
285         if (request.getThreadContext().equals(txn) && LockLevel.isWrite(request.getLockLevel())) {
286           iter.remove();
287           pendingLockUpgrades.add(request);
288           break;
289         }
290       }
291     }
292   }
293
294   synchronized void addRecalledPendingRequest(ServerThreadContext txn, int lockLevel, Sink lockResponseSink) {
295     // debug("addRecalledPendingRequest - BEGIN -", txn, ",", LockLevel.toString(lockLevel));
296
addPending(txn, lockLevel, lockResponseSink);
297   }
298
299   private synchronized void recall(int recallLevel) {
300     if (recalled) { return; }
301     for (Iterator JavaDoc i = greedyHolders.values().iterator(); i.hasNext();) {
302       Holder holder = (Holder) i.next();
303       // debug("recall() - issued for -", holder);
304
holder.getSink().add(
305                            createLockRecallResponseContext(holder.getLockID(), holder.getThreadContext().getId(),
306                                                            recallLevel));
307       recalled = true;
308     }
309   }
310
311   private boolean isGreedyRequest(ServerThreadContext txn) {
312     return (txn.getId().getClientThreadID().equals(ThreadID.VM_ID));
313   }
314
315   private boolean isPolicyGreedy() {
316     return lockPolicy == LockManagerImpl.GREEDY_LOCK_POLICY;
317   }
318
319   int getLockPolicy() {
320     return lockPolicy;
321   }
322
323   void setLockPolicy(int newPolicy) {
324     if (!isNull() && newPolicy != lockPolicy) {
325       this.lockPolicy = newPolicy;
326       if (!isPolicyGreedy()) {
327         recall(LockLevel.WRITE);
328       }
329     }
330   }
331
332   private void awardGreedyAndRespond(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) {
333     // debug("awardGreedyAndRespond() - BEGIN - ", txn, ",", LockLevel.toString(requestedLockLevel));
334
final ServerThreadContext clientTx = getClientVMContext(txn);
335     final int greedyLevel = LockLevel.makeGreedy(requestedLockLevel);
336
337     ChannelID ch = txn.getId().getChannelID();
338     checkAndClearStateOnGreedyAward(ch, requestedLockLevel);
339     awardAndRespond(clientTx, greedyLevel, lockResponseSink);
340     Holder holder = getHolder(clientTx);
341     holder.setSink(lockResponseSink);
342     greedyHolders.put(ch, holder);
343     clearWaitingOn(txn);
344   }
345
346   private void cannotAwardAndRespond(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) {
347     lockResponseSink.add(createLockRejectedResponseContext(this.lockID, txn.getId(), requestedLockLevel));
348   }
349
350   private void awardAndRespond(ServerThreadContext txn, int requestedLockLevel, Sink lockResponseSink) {
351     // debug("awardRespond() - BEGIN - ", txn, ",", LockLevel.toString(requestedLockLevel));
352
awardLock(txn, requestedLockLevel);
353     lockResponseSink.add(createLockAwardResponseContext(this.lockID, txn.getId(), requestedLockLevel));
354   }
355
356   synchronized void notify(ServerThreadContext txn, boolean all, NotifiedWaiters addNotifiedWaitersTo)
357       throws TCIllegalMonitorStateException {
358     // debug("notify() - BEGIN - ", txn, ", all = " + all);
359
if (waiters.containsKey(txn)) { throw Assert.failure("Can't notify self: " + txn); }
360     checkLegalWaitNotifyState(txn);
361
362     if (waiters.size() > 0) {
363       final int numToNotify = all ? waiters.size() : 1;
364       for (int i = 0; i < numToNotify; i++) {
365         LockWaitContext wait = (LockWaitContext) waiters.remove(0);
366         removeAndCancelWaitTimer(wait);
367         createPendingFromWaiter(wait);
368         addNotifiedWaitersTo.addNotification(new LockContext(lockID, wait.getChannelID(), wait.getThreadID(), wait.lockLevel()));
369       }
370     }
371   }
372
373   synchronized void interrupt(ServerThreadContext txn) {
374     if (waiters.size() == 0 || !waiters.containsKey(txn)) {
375       logger.warn("Cannot interrupt: " + txn + " is not waiting.");
376       return;
377     }
378     LockWaitContext wait = (LockWaitContext)waiters.remove(txn);
379     removeAndCancelWaitTimer(wait);
380     createPendingFromWaiter(wait);
381   }
382
383   private void removeAndCancelWaitTimer(LockWaitContext wait) {
384     TimerTask JavaDoc task = (TimerTask JavaDoc) waitTimers.remove(wait);
385     if (task != null) task.cancel();
386   }
387
388   private void createPendingFromWaiter(LockWaitContext wait) {
389     // XXX: This cast to WaitContextImpl is lame. I'm not sure how to refactor it right now.
390
Request request = new Request(((LockWaitContextImpl) wait).getThreadContext(), wait.lockLevel(), wait
391         .getLockResponseSink());
392     pendingLockRequests.add(request);
393     if (isPolicyGreedy() && hasGreedyHolders()) {
394       recall(LockLevel.WRITE);
395     }
396   }
397
398   synchronized void waitTimeout(LockWaitContext context) {
399
400     // debug("waitTimeout() - BEGIN -", context);
401
// XXX: This cast is gross, too.
402
ServerThreadContext txn = ((LockWaitContextImpl) context).getThreadContext();
403     Object JavaDoc removed = waiters.remove(txn);
404
405     if (removed != null) {
406       waitTimers.remove(context);
407       Sink lockResponseSink = context.getLockResponseSink();
408       int lockLevel = context.lockLevel();
409
410       // Add a wait Timeout message
411
lockResponseSink.add(createLockWaitTimeoutResponseContext(this.lockID, txn.getId(), lockLevel));
412
413       if (holders.size() == 0) {
414         if (isPolicyGreedy() && (getWaiterCount() == 0)) {
415           awardGreedyAndRespond(txn, lockLevel, lockResponseSink);
416         } else {
417           awardAndRespond(txn, lockLevel, lockResponseSink);
418         }
419       } else {
420         createPendingFromWaiter(context);
421       }
422     }
423   }
424
425   synchronized void wait(ServerThreadContext txn, WaitTimer waitTimer, WaitInvocation call, WaitTimerCallback callback,
426                          Sink lockResponseSink) throws TCIllegalMonitorStateException {
427     // debug("wait() - BEGIN -", txn, ", ", call);
428
if (waiters.containsKey(txn)) throw Assert.failure("Already in wait set: " + txn);
429     checkLegalWaitNotifyState(txn);
430
431     Holder current = getHolder(txn);
432     Assert.assertNotNull(current);
433     boolean isUpgrade = current.isUpgrade();
434
435     LockWaitContext waitContext = new LockWaitContextImpl(txn, this, call, current.getLockLevel(), lockResponseSink);
436     waiters.put(txn, waitContext);
437
438     scheduleWait(callback, waitTimer, waitContext);
439     txn.setWaitingOn(this);
440     removeCurrentHold(txn);
441
442     if (isUpgrade) {
443       // wait()'ing on an upgraded lock needs to release both the READ and WRITE
444
// locks held
445
removeCurrentHold(txn);
446     }
447
448     nextPending();
449   }
450
451   // This method reestablished Wait State and schedules wait timeouts too. There are cases where we may need to ignore a
452
// wait, if we already know about it. Note that it could be either in waiting or pending state.
453
synchronized void addRecalledWaiter(ServerThreadContext txn, WaitInvocation call, int lockLevel,
454                                       Sink lockResponseSink, WaitTimer waitTimer, WaitTimerCallback callback) {
455     // debug("addRecalledWaiter() - BEGIN -", txn, ", ", call);
456

457     LockWaitContext waitContext = new LockWaitContextImpl(txn, this, call, lockLevel, lockResponseSink);
458     if (waiters.containsKey(txn)) {
459       logger.debug("addRecalledWaiter(): Ignoring " + waitContext + " as it is already in waiters list.");
460       return;
461     }
462     Request request = new Request(txn, lockLevel, lockResponseSink);
463     if (pendingLockRequests.contains(request)) {
464       logger.debug("addRecalledWaiter(): Ignoring " + waitContext + " as it is already in pending list.");
465       return;
466     }
467     waiters.put(txn, waitContext);
468     scheduleWait(callback, waitTimer, waitContext);
469   }
470
471   // This method reestablished Wait State and does not schedules wait timeouts too. This is
472
// called when LockManager is starting and wait timers are started when the lock Manager is started.
473
synchronized void reestablishWait(ServerThreadContext txn, WaitInvocation call, int lockLevel, Sink lockResponseSink) {
474     LockWaitContext waitContext = new LockWaitContextImpl(txn, this, call, lockLevel, lockResponseSink);
475     Object JavaDoc old = waiters.put(txn, waitContext);
476     if (old != null) throw Assert.failure("Already in wait set: " + txn);
477   }
478
479   synchronized void reestablishLock(ServerThreadContext threadContext, int requestedLevel, Sink lockResponseSink) {
480     if ((LockLevel.isWrite(requestedLevel) && holders.size() != 0)
481         || (LockLevel.isRead(requestedLevel) && LockLevel.isWrite(this.level))) {
482       //
483
throw new AssertionError JavaDoc("Lock " + this + " already held by other Holder. Can't grant to " + threadContext
484                                + LockLevel.toString(requestedLevel));
485
486     }
487     if (waiters.get(threadContext) != null) {
488       //
489
throw new AssertionError JavaDoc("Thread " + threadContext + "is already in Wait state for Lock " + this
490                                + ". Can't grant Lock Hold !");
491     }
492     if (isGreedyRequest(threadContext)) {
493       int greedyLevel = LockLevel.makeGreedy(requestedLevel);
494       ChannelID ch = threadContext.getId().getChannelID();
495       awardLock(threadContext, greedyLevel);
496       Holder holder = getHolder(threadContext);
497       holder.setSink(lockResponseSink);
498       greedyHolders.put(ch, holder);
499     } else {
500       awardLock(threadContext, requestedLevel);
501     }
502   }
503
504   private void scheduleWait(WaitTimerCallback callback, WaitTimer waitTimer, LockWaitContext waitContext) {
505     final TimerTask JavaDoc timer = waitTimer.scheduleTimer(callback, waitContext.getWaitInvocation(), waitContext);
506     if (timer != null) {
507       waitTimers.put(waitContext, timer);
508     }
509   }
510
511   private void checkLegalWaitNotifyState(ServerThreadContext threadContext) throws TCIllegalMonitorStateException {
512     Assert.assertFalse(isNull());
513
514     final int holdersSize = holders.size();
515     if (holdersSize != 1) { throw new TCIllegalMonitorStateException("Invalid holder set size: " + holdersSize); }
516
517     final int currentLevel = this.level;
518     if (!LockLevel.isWrite(currentLevel)) { throw new TCIllegalMonitorStateException("Incorrect lock level: "
519                                                                                      + LockLevel.toString(currentLevel)); }
520
521     Holder holder = getHolder(threadContext);
522     if (holder == null) {
523       holder = getHolder(getClientVMContext(threadContext));
524     }
525
526     if (holder == null) {
527       // make formatter sane
528
throw new TCIllegalMonitorStateException(threadContext + " is not the current lock holder for: " + threadContext);
529     }
530   }
531
532   private ServerThreadContext getClientVMContext(ServerThreadContext threadContext) {
533     return threadContextFactory.getOrCreate(threadContext.getId().getChannelID(), ThreadID.VM_ID);
534   }
535
536   public synchronized int getHoldersCount() {
537     return holders.size();
538   }
539
540   public synchronized int getPendingCount() {
541     return pendingLockRequests.size();
542   }
543
544   public synchronized int getPendingUpgradeCount() {
545     return pendingLockUpgrades.size();
546   }
547
548   Collection JavaDoc getHoldersCollection() {
549     return Collections.unmodifiableCollection(this.holders.values());
550   }
551
552   public synchronized String JavaDoc toString() {
553     try {
554       StringBuffer JavaDoc rv = new StringBuffer JavaDoc();
555
556       rv.append(lockID).append(", ").append("Level: ").append(LockLevel.toString(this.level)).append("\r\n");
557
558       rv.append("Holders (").append(holders.size()).append(")\r\n");
559       for (Iterator JavaDoc iter = holders.values().iterator(); iter.hasNext();) {
560         rv.append('\t').append(iter.next().toString()).append("\r\n");
561       }
562
563       rv.append("Wait Set (").append(waiters.size()).append(")\r\n");
564       for (Iterator JavaDoc iter = waiters.values().iterator(); iter.hasNext();) {
565         rv.append('\t').append(iter.next().toString()).append("\r\n");
566       }
567
568       rv.append("Pending lock requests (").append(pendingLockRequests.size()).append(")\r\n");
569       for (Iterator JavaDoc iter = pendingLockRequests.iterator(); iter.hasNext();) {
570         rv.append('\t').append(iter.next().toString()).append("\r\n");
571       }
572
573       rv.append("Pending lock upgrades (").append(pendingLockUpgrades.size()).append(")\r\n");
574       for (Iterator JavaDoc iter = pendingLockUpgrades.iterator(); iter.hasNext();) {
575         rv.append('\t').append(iter.next().toString()).append("\r\n");
576       }
577
578       return rv.toString();
579     } catch (Throwable JavaDoc t) {
580       t.printStackTrace();
581       return "Exception in toString(): " + t.getMessage();
582     }
583   }
584
585   private void awardLock(ServerThreadContext threadContext, int lockLevel) {
586     Assert.assertFalse(isNull());
587
588     Holder holder = getHolder(threadContext);
589
590     if (holder != null) {
591       holder.addLockLevel(lockLevel);
592       this.level = holder.getLockLevel();
593     } else {
594       threadContext.addLock(this);
595       holder = new Holder(this.lockID, threadContext, this.timeout);
596       holder.addLockLevel(lockLevel);
597       Object JavaDoc prev = this.holders.put(threadContext, holder);
598       Assert.assertNull(prev);
599       this.level = holder.getLockLevel();
600       notifyAwardLock(holder);
601     }
602
603   }
604
605   private void notifyAwardLock(Holder holder) {
606     final int waitingCount = this.pendingLockRequests.size();
607
608     for (int i = 0; i < listeners.length; i++) {
609       listeners[i].notifyAward(waitingCount, holder);
610     }
611   }
612
613   public synchronized boolean isRead() {
614     return LockLevel.READ == this.level;
615   }
616
617   public synchronized boolean isWrite() {
618     return LockLevel.WRITE == this.level;
619   }
620
621   // XXX:: Note that lockLevel == UPGRADE (in notified waits) also get into pendingLockRequests which is correct
622
synchronized void addPending(ServerThreadContext threadContext, int lockLevel, Sink awardLockSink) {
623     Assert.assertFalse(isNull());
624     // debug("addPending() - BEGIN -", threadContext, ", ", LockLevel.toString(lockLevel));
625

626     Request request = new Request(threadContext, lockLevel, awardLockSink);
627
628     if ((lockLevel == LockLevel.WRITE) && holdsReadLock(threadContext)) {
629       // this is a lock upgrade request
630
this.pendingLockUpgrades.add(request);
631     } else {
632       if (pendingLockRequests.contains(request)) {
633         logger.debug("Ignoring existing Request " + request + " in Lock " + lockID);
634         return;
635       }
636
637       this.pendingLockRequests.add(request);
638       for (Iterator JavaDoc currentHolders = holders.values().iterator(); currentHolders.hasNext();) {
639         Holder holder = (Holder) currentHolders.next();
640         notifyAddPending(holder);
641       }
642     }
643
644     threadContext.setWaitingOn(this);
645   }
646
647   private boolean holdsReadLock(ServerThreadContext threadContext) {
648     Holder holder = getHolder(threadContext);
649     if (holder != null) { return holder.getLockLevel() == LockLevel.READ; }
650     return false;
651   }
652
653   private Holder getHolder(ServerThreadContext threadContext) {
654     return (Holder) this.holders.get(threadContext);
655   }
656
657   private void notifyAddPending(Holder holder) {
658     final int waitingCount = this.pendingLockRequests.size();
659
660     for (int i = 0; i < this.listeners.length; i++) {
661       this.listeners[i].notifyAddPending(waitingCount, holder);
662     }
663   }
664
665   synchronized int getWaiterCount() {
666     return this.waiters.size();
667   }
668
669   synchronized boolean hasPending() {
670     return pendingLockRequests.size() > 0 || pendingLockUpgrades.size() > 0;
671   }
672
673   synchronized boolean hasWaiting() {
674     return this.waiters.size() > 0;
675   }
676
677   boolean hasGreedyHolders() {
678     return this.greedyHolders.size() > 0;
679   }
680
681   synchronized boolean hasWaiting(ServerThreadContext threadContext) {
682     return (this.waiters.get(threadContext) != null);
683   }
684
685   public LockID getLockID() {
686     return lockID;
687   }
688
689   public boolean isNull() {
690     return this.isNull;
691   }
692
693   public int hashCode() {
694     return this.lockID.hashCode();
695   }
696
697   public boolean equals(Object JavaDoc obj) {
698     if (obj instanceof Lock) {
699       Lock other = (Lock) obj;
700       return this.lockID.equals(other.lockID);
701     }
702     return false;
703   }
704
705   synchronized boolean nextPending() {
706     Assert.eval(!isNull());
707     // debug("nextPending() - BEGIN -");
708

709     boolean clear;
710     try {
711       // FIXME:: If ever there are two lock upgrade request, then it is a deadlock situation.
712
// Can be easily fixed if we know what is the right solution to the problem.
713
if ((holders.size() == 1) && (!pendingLockUpgrades.isEmpty())) {
714         // Can we award an upgrade?
715
Request request = (Request) pendingLockUpgrades.get(0);
716         if (holdsReadLock(request.getThreadContext())) {
717           // Upgrades are not given greedily
718
// debug("nextPending() - Giving Upgrade -", request);
719
pendingLockUpgrades.remove(0);
720           grantRequest(request);
721         }
722       } else if (holders.isEmpty()) {
723         if (!pendingLockRequests.isEmpty()) {
724           Request request = (Request) pendingLockRequests.get(0);
725           int reqLockLevel = request.getLockLevel();
726           switch (reqLockLevel) {
727             case LockLevel.WRITE: {
728               // Give locks greedily only if there is no one waiting or pending for this lock
729
if (isPolicyGreedy() && isAllPendingLockRequestsFromChannel(request.getRequesterID())
730                   && (getWaiterCount() == 0)) {
731                 // debug("nextPending() - Giving GREEDY WRITE request -", request);
732
pendingLockRequests.remove(0);
733                 grantGreedyRequest(request);
734                 break;
735               }
736               // else fall thru
737
}
738             case UPGRADE: {
739               // debug("nextPending() - granting not greedily request -", request);
740
// Upgrades are not given greedily
741
pendingLockRequests.remove(0);
742               grantRequest(request);
743               break;
744             }
745             case LockLevel.READ: {
746               // debug("nextPending() - granting READ request -", request);
747
awardAllReads();
748               break;
749             }
750             default: {
751               throw new TCInternalError("Unknown lock level in request: " + reqLockLevel);
752             }
753           }
754         }
755       }
756
757     } finally {
758       clear = holders.size() == 0 && this.waiters.size() == 0 && this.pendingLockRequests.size() == 0
759               && this.pendingLockUpgrades.size() == 0;
760     }
761
762     return clear;
763   }
764
765   private void grantGreedyRequest(Request request) {
766     // debug("grantGreedyRequest() - BEGIN -", request);
767
ServerThreadContext threadContext = request.getThreadContext();
768     awardGreedyAndRespond(threadContext, request.getLockLevel(), request.getLockResponseSink());
769     clearWaitingOn(threadContext);
770   }
771
772   private void grantRequest(Request request) {
773     // debug("grantRequest() - BEGIN -", request);
774
ServerThreadContext threadContext = request.getThreadContext();
775     awardLock(threadContext, request.getLockLevel());
776     clearWaitingOn(threadContext);
777     request.execute(lockID);
778   }
779
780   /**
781    * Remove the specified lock hold.
782    *
783    * @return true if the current hold was an upgrade
784    */

785   synchronized boolean removeCurrentHold(ServerThreadContext threadContext) {
786     // debug("removeCurrentHold() - BEGIN -", threadContext);
787
Holder holder = getHolder(threadContext);
788     if (holder != null) {
789       if (holder.isUpgrade()) {
790         holder.removeLockLevel(LockLevel.WRITE);
791         this.level = holder.getLockLevel();
792         return true;
793       } else {
794         this.holders.remove(threadContext);
795         threadContext.removeLock(this);
796         threadContextFactory.removeIfClear(threadContext);
797         if (isGreedyRequest(threadContext)) {
798           removeGreedyHolder(threadContext.getId().getChannelID());
799         }
800         this.level = (holders.size() == 0 ? LockLevel.NIL_LOCK_LEVEL : LockLevel.READ);
801         notifyRevoke(holder);
802       }
803     }
804     return false;
805   }
806
807   synchronized boolean recallCommit(ServerThreadContext threadContext) {
808     // debug("recallCommit() - BEGIN -", threadContext);
809
Assert.assertTrue(isGreedyRequest(threadContext));
810     boolean issueRecall = !recalled;
811     removeCurrentHold(threadContext);
812     if (issueRecall) {
813       recall(LockLevel.WRITE);
814     }
815     if (recalled == false) { return nextPending(); }
816     return false;
817   }
818
819   private synchronized void removeGreedyHolder(ChannelID channelID) {
820     // debug("removeGreedyHolder() - BEGIN -", channelID);
821
greedyHolders.remove(channelID);
822     if (!hasGreedyHolders()) {
823       recalled = false;
824     }
825   }
826
827   private void clearWaitingOn(ServerThreadContext threadContext) {
828     threadContext.clearWaitingOn();
829     threadContextFactory.removeIfClear(threadContext);
830   }
831
832   synchronized void awardAllReads() {
833     // debug("awardAllReads() - BEGIN -");
834
List JavaDoc pendingReadLockRequests = new ArrayList JavaDoc(pendingLockRequests.size());
835     boolean hasPendingWrites = false;
836
837     for (Iterator JavaDoc i = pendingLockRequests.iterator(); i.hasNext();) {
838       Request request = (Request) i.next();
839       if (request.getLockLevel() == LockLevel.READ) {
840         pendingReadLockRequests.add(request);
841         i.remove();
842       } else if (!hasPendingWrites && request.getLockLevel() == LockLevel.WRITE) {
843         hasPendingWrites = true;
844       }
845     }
846
847     for (Iterator JavaDoc i = pendingReadLockRequests.iterator(); i.hasNext();) {
848       Request request = (Request) i.next();
849       if (isPolicyGreedy() && !hasPendingWrites) {
850         ServerThreadContext tid = request.getThreadContext();
851         if (!holdsGreedyLock(tid)) {
852           grantGreedyRequest(request);
853         } else {
854           // These can be awarded locally in the client ...
855
clearWaitingOn(tid);
856         }
857       } else {
858         grantRequest(request);
859       }
860     }
861   }
862
863   synchronized boolean holdsSomeLock(ChannelID ch) {
864     for (Iterator JavaDoc iter = holders.values().iterator(); iter.hasNext();) {
865       Holder holder = (Holder) iter.next();
866       if (holder.getChannelID().equals(ch)) { return true; }
867     }
868     return false;
869   }
870
871   synchronized boolean holdsGreedyLock(ServerThreadContext threadContext) {
872     return (greedyHolders.get(threadContext.getId().getChannelID()) != null);
873   }
874
875   synchronized boolean canAwardGreedilyOnTheClient(ServerThreadContext threadContext, int lockLevel) {
876     Holder holder = (Holder) greedyHolders.get(threadContext.getId().getChannelID());
877     if (holder != null) { return (LockLevel.isWrite(holder.getLockLevel()) || holder.getLockLevel() == lockLevel); }
878     return false;
879   }
880
881   private void notifyRevoke(Holder holder) {
882     for (int i = 0; i < this.listeners.length; i++) {
883       this.listeners[i].notifyRevoke(holder);
884     }
885   }
886
887   void notifyStarted(WaitTimerCallback callback, WaitTimer timer) {
888     for (Iterator JavaDoc i = waiters.values().iterator(); i.hasNext();) {
889       LockWaitContext ctxt = (LockWaitContext) i.next();
890       scheduleWait(callback, timer, ctxt);
891     }
892   }
893
894   synchronized boolean isAllPendingLockRequestsFromChannel(ChannelID channelId) {
895     for (Iterator JavaDoc i = pendingLockRequests.iterator(); i.hasNext();) {
896       Request r = (Request) i.next();
897       if (!r.getRequesterID().equals(channelId)) { return false; }
898     }
899     return true;
900   }
901
902   /**
903    * This clears out stuff from the pending and wait lists that belonged to a dead session. It occurs to me that this is
904    * a race condition because a request could come in on the connection, then the cleanup could happen, and then the
905    * request could be processed. We need to drop requests that are processed after the cleanup
906    *
907    * @param channelId
908    */

909   synchronized void clearStateForChannel(ChannelID channelId) {
910     // debug("clearStateForChannel() - BEGIN -", channelId);
911
for (Iterator JavaDoc i = holders.values().iterator(); i.hasNext();) {
912       Holder holder = (Holder) i.next();
913       if (holder.getChannelID().equals(channelId)) {
914         i.remove();
915       }
916     }
917
918     for (Iterator JavaDoc i = pendingLockUpgrades.iterator(); i.hasNext();) {
919       Request r = (Request) i.next();
920       if (r.getRequesterID().equals(channelId)) {
921         i.remove();
922       }
923     }
924
925     for (Iterator JavaDoc i = pendingLockRequests.iterator(); i.hasNext();) {
926       Request r = (Request) i.next();
927       if (r.getRequesterID().equals(channelId)) {
928         i.remove();
929       }
930     }
931
932     for (Iterator JavaDoc i = waiters.values().iterator(); i.hasNext();) {
933       LockWaitContext wc = (LockWaitContext) i.next();
934       if (wc.getChannelID().equals(channelId)) {
935         i.remove();
936       }
937     }
938
939     for (Iterator JavaDoc i = waitTimers.keySet().iterator(); i.hasNext();) {
940       LockWaitContext wc = (LockWaitContext) i.next();
941       if (wc.getChannelID().equals(channelId)) {
942         try {
943           TimerTask JavaDoc task = (TimerTask JavaDoc) waitTimers.get(wc);
944           task.cancel();
945         } finally {
946           i.remove();
947         }
948       }
949     }
950     removeGreedyHolder(channelId);
951   }
952
953   synchronized void checkAndClearStateOnGreedyAward(ChannelID ch, int requestedLevel) {
954     // We dont want to award a greedy lock for lock upgrades or if there are waiters.
955
// debug("checkAndClearStateOnGreedyAward For ", ch, ", ", LockLevel.toString(requestedLevel));
956
// debug("checkAndClear... BEFORE Lock = ", this);
957
Assert.assertTrue(pendingLockUpgrades.size() == 0);
958     Assert.assertTrue((requestedLevel == LockLevel.READ) || (waiters.size() == 0));
959
960     for (Iterator JavaDoc i = holders.values().iterator(); i.hasNext();) {
961       Holder holder = (Holder) i.next();
962       if (holder.getChannelID().equals(ch)) {
963         i.remove();
964       }
965     }
966     for (Iterator JavaDoc i = pendingLockRequests.iterator(); i.hasNext();) {
967       Request r = (Request) i.next();
968       if (r.getRequesterID().equals(ch)) {
969         if ((requestedLevel == LockLevel.WRITE) || (r.getLockLevel() == requestedLevel)) {
970           // debug("checkAndClear... removing request = ", r);
971
i.remove();
972           ServerThreadContext tid = r.getThreadContext();
973           // debug("checkAndClear... clearing threadContext = ", tid);
974
clearWaitingOn(tid);
975         } else {
976           throw new AssertionError JavaDoc("Issuing READ lock greedily when WRITE pending !");
977         }
978       }
979     }
980     // debug("checkAndClear... AFTER Lock = ", this);
981
}
982
983   // I wish we were using 1.5 !!!
984
// private void debug(Object o1, Object o2) {
985
// logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2));
986
// }
987
//
988
// private void debug(Object o1, Object o2, Object o3) {
989
// logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2) + String.valueOf(o3));
990
// }
991
//
992
// private void debug(Object o1, Object o2, Object o3, Object o4) {
993
// logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2) + String.valueOf(o3) + String.valueOf(o4));
994
// }
995
//
996
// private void debug(Object o) {
997
// logger.warn(lockID + String.valueOf(o));
998
// }
999

1000}
1001
Popular Tags