KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > objectserver > lockmanager > impl > LockManagerTest


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.objectserver.lockmanager.impl;
5
6 import org.apache.commons.io.output.NullOutputStream;
7
8 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
9
10 import com.tc.exception.ImplementMe;
11 import com.tc.net.TCSocketAddress;
12 import com.tc.net.protocol.NetworkStackID;
13 import com.tc.net.protocol.TCNetworkMessage;
14 import com.tc.net.protocol.tcm.ChannelEventListener;
15 import com.tc.net.protocol.tcm.ChannelID;
16 import com.tc.net.protocol.tcm.MessageChannel;
17 import com.tc.net.protocol.tcm.TCMessage;
18 import com.tc.net.protocol.tcm.TCMessageType;
19 import com.tc.object.lockmanager.api.LockID;
20 import com.tc.object.lockmanager.api.LockLevel;
21 import com.tc.object.lockmanager.api.ServerThreadID;
22 import com.tc.object.lockmanager.api.ThreadID;
23 import com.tc.object.tx.WaitInvocation;
24 import com.tc.objectserver.api.TestSink;
25 import com.tc.objectserver.context.LockResponseContext;
26 import com.tc.objectserver.lockmanager.api.DeadlockChain;
27 import com.tc.objectserver.lockmanager.api.DeadlockResults;
28 import com.tc.objectserver.lockmanager.api.LockHolder;
29 import com.tc.objectserver.lockmanager.api.LockMBean;
30 import com.tc.objectserver.lockmanager.api.NullChannelManager;
31 import com.tc.objectserver.lockmanager.api.ServerLockRequest;
32 import com.tc.objectserver.lockmanager.api.Waiter;
33 import com.tc.util.concurrent.ThreadUtil;
34
35 import java.io.IOException JavaDoc;
36 import java.io.ObjectOutputStream JavaDoc;
37 import java.net.UnknownHostException JavaDoc;
38 import java.util.ArrayList JavaDoc;
39 import java.util.Arrays JavaDoc;
40 import java.util.Comparator JavaDoc;
41 import java.util.HashMap JavaDoc;
42 import java.util.Iterator JavaDoc;
43 import java.util.List JavaDoc;
44 import java.util.Map JavaDoc;
45 import java.util.Random JavaDoc;
46
47 import junit.framework.TestCase;
48
49 /**
50  * @author steve
51  */

52 public class LockManagerTest extends TestCase {
53   private TestSink sink;
54   private LockManagerImpl lockManager;
55   private Random JavaDoc random = new Random JavaDoc();
56
57   final int numLocks = 30;
58   final int numThreads = 15;
59   private LockID[] locks = makeUniqueLocks(numLocks);
60   private ServerThreadID[] txns = makeUniqueTxns(numThreads);
61
62   protected void setUp() throws Exception JavaDoc {
63     super.setUp();
64     resetLockManager();
65     sink = new TestSink();
66   }
67
68   private void resetLockManager() {
69     resetLockManager(false);
70   }
71
72   private void resetLockManager(boolean start) {
73     if (lockManager != null) {
74       try {
75         lockManager.stop();
76       } catch (InterruptedException JavaDoc e) {
77         fail();
78       }
79     }
80
81     lockManager = new LockManagerImpl(new NullChannelManager());
82     lockManager.setLockPolicy(LockManagerImpl.ALTRUISTIC_LOCK_POLICY);
83     if (start) {
84       lockManager.start();
85     }
86   }
87
88   protected void tearDown() throws Exception JavaDoc {
89     assertEquals(0, lockManager.getLockCount());
90     assertEquals(0, lockManager.getThreadContextCount());
91     super.tearDown();
92   }
93
94   public void testLockMBean() throws IOException JavaDoc {
95
96     final MessageChannel channel = new TestMessageChannel();
97
98     final long start = System.currentTimeMillis();
99     final ChannelID cid1 = new ChannelID(1);
100     ChannelID cid2 = new ChannelID(2);
101     LockID lid1 = new LockID("1");
102     LockID lid2 = new LockID("2");
103     LockID lid3 = new LockID("3");
104     ThreadID tid1 = new ThreadID(1);
105     WaitInvocation wait = new WaitInvocation(Integer.MAX_VALUE);
106
107     lockManager = new LockManagerImpl(new NullChannelManager() {
108       public MessageChannel getChannel(ChannelID id) {
109         if (cid1.equals(id)) { return channel; }
110         return null;
111       }
112
113       public String JavaDoc getChannelAddress(ChannelID channelID) {
114         if (cid1.equals(channelID)) { return "127.0.0.1:6969"; }
115         return "no longer connected";
116       }
117     });
118
119     lockManager.setLockPolicy(LockManagerImpl.ALTRUISTIC_LOCK_POLICY);
120     lockManager.start();
121
122     lockManager.requestLock(lid1, cid1, tid1, LockLevel.WRITE, sink); // hold
123
lockManager.requestLock(lid1, cid2, tid1, LockLevel.WRITE, sink); // pending
124

125     lockManager.requestLock(lid2, cid1, tid1, LockLevel.READ, sink); // hold
126
lockManager.requestLock(lid2, cid2, tid1, LockLevel.READ, sink); // hold
127
lockManager.requestLock(lid2, cid1, tid1, LockLevel.WRITE, sink); // upgrade
128

129     lockManager.requestLock(lid3, cid1, tid1, LockLevel.WRITE, sink); // hold
130
lockManager.wait(lid3, cid1, tid1, wait, sink); // wait
131

132     LockMBean[] lockBeans = lockManager.getAllLocks();
133     assertEquals(3, lockBeans.length);
134     sortLocksByID(lockBeans);
135
136     LockMBean bean1 = lockBeans[0];
137     LockMBean bean2 = lockBeans[1];
138     LockMBean bean3 = lockBeans[2];
139     testSerialize(bean1);
140     testSerialize(bean2);
141     testSerialize(bean3);
142
143     validateBean1(bean1, start);
144     validateBean2(bean2, start);
145     validateBean3(bean3, start, wait);
146
147     lockManager.clearAllLocksFor(cid1);
148     lockManager.clearAllLocksFor(cid2);
149   }
150
151   private void validateBean3(LockMBean bean3, long time, WaitInvocation wait) {
152     LockHolder[] holders = bean3.getHolders();
153     ServerLockRequest[] reqs = bean3.getPendingRequests();
154     ServerLockRequest[] upgrades = bean3.getPendingUpgrades();
155     Waiter[] waiters = bean3.getWaiters();
156     assertEquals(0, holders.length);
157     assertEquals(0, reqs.length);
158     assertEquals(0, upgrades.length);
159     assertEquals(1, waiters.length);
160
161     Waiter waiter = waiters[0];
162     assertEquals(wait.toString(), waiter.getWaitInvocation());
163     assertTrue(waiter.getStartTime() >= time);
164     assertEquals(new ChannelID(1), waiter.getChannelID());
165     assertEquals("127.0.0.1:6969", waiter.getChannelAddr());
166     assertEquals(new ThreadID(1), waiter.getThreadID());
167   }
168
169   private void validateBean2(LockMBean bean2, long time) {
170     LockHolder[] holders = bean2.getHolders();
171     ServerLockRequest[] reqs = bean2.getPendingRequests();
172     ServerLockRequest[] upgrades = bean2.getPendingUpgrades();
173     Waiter[] waiters = bean2.getWaiters();
174     assertEquals(2, holders.length);
175     assertEquals(0, reqs.length);
176     assertEquals(1, upgrades.length);
177     assertEquals(0, waiters.length);
178
179     LockHolder holder = holders[0];
180     assertEquals(LockLevel.toString(LockLevel.READ), holder.getLockLevel());
181     assertTrue(holder.getTimeAcquired() >= time);
182     assertEquals(new ChannelID(1), holder.getChannelID());
183     assertEquals("127.0.0.1:6969", holder.getChannelAddr());
184     assertEquals(new ThreadID(1), holder.getThreadID());
185
186     holder = holders[1];
187     assertEquals(LockLevel.toString(LockLevel.READ), holder.getLockLevel());
188     assertTrue(holder.getTimeAcquired() >= time);
189     assertEquals(new ChannelID(2), holder.getChannelID());
190     assertEquals("no longer connected", holder.getChannelAddr());
191     assertEquals(new ThreadID(1), holder.getThreadID());
192
193     ServerLockRequest up = upgrades[0];
194     assertEquals(LockLevel.toString(LockLevel.WRITE), up.getLockLevel());
195     assertTrue(up.getRequestTime() >= time);
196     assertEquals(new ChannelID(1), up.getChannelID());
197     assertEquals("127.0.0.1:6969", up.getChannelAddr());
198     assertEquals(new ThreadID(1), up.getThreadID());
199   }
200
201   private void validateBean1(LockMBean bean1, long time) {
202     LockHolder[] holders = bean1.getHolders();
203     ServerLockRequest[] reqs = bean1.getPendingRequests();
204     ServerLockRequest[] upgrades = bean1.getPendingUpgrades();
205     Waiter[] waiters = bean1.getWaiters();
206     assertEquals(1, holders.length);
207     assertEquals(1, reqs.length);
208     assertEquals(0, upgrades.length);
209     assertEquals(0, waiters.length);
210
211     LockHolder holder = holders[0];
212     assertEquals(LockLevel.toString(LockLevel.WRITE), holder.getLockLevel());
213     assertTrue(holder.getTimeAcquired() >= time);
214     assertEquals(new ChannelID(1), holder.getChannelID());
215     assertEquals(new ThreadID(1), holder.getThreadID());
216     assertEquals("127.0.0.1:6969", holder.getChannelAddr());
217
218     ServerLockRequest req = reqs[0];
219     assertEquals(LockLevel.toString(LockLevel.WRITE), req.getLockLevel());
220     assertTrue(req.getRequestTime() >= time);
221     assertEquals(new ChannelID(2), req.getChannelID());
222     assertEquals("no longer connected", req.getChannelAddr());
223     assertEquals(new ThreadID(1), req.getThreadID());
224   }
225
226   private void testSerialize(Object JavaDoc o) throws IOException JavaDoc {
227     ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(new NullOutputStream());
228     oos.writeObject(o);
229     oos.close();
230   }
231
232   private void sortLocksByID(LockMBean[] lockBeans) {
233     Arrays.sort(lockBeans, new Comparator JavaDoc() {
234       public int compare(Object JavaDoc o1, Object JavaDoc o2) {
235         LockMBean l1 = (LockMBean) o1;
236         LockMBean l2 = (LockMBean) o2;
237
238         String JavaDoc id1 = l1.getLockName();
239         String JavaDoc id2 = l2.getLockName();
240
241         return id1.compareTo(id2);
242       }
243     });
244   }
245
246   public void testReestablishWait() throws Exception JavaDoc {
247     LockID lockID1 = new LockID("my lock");
248     ChannelID channel1 = new ChannelID(1);
249     ThreadID tx1 = new ThreadID(1);
250     ThreadID tx2 = new ThreadID(2);
251
252     try {
253       assertEquals(0, lockManager.getLockCount());
254       long waitTime = 1000;
255       WaitInvocation waitCall = new WaitInvocation(waitTime);
256       TestSink responseSink = new TestSink();
257       long t0 = System.currentTimeMillis();
258       lockManager.reestablishWait(lockID1, channel1, tx1, LockLevel.WRITE, waitCall, responseSink);
259       lockManager.reestablishWait(lockID1, channel1, tx2, LockLevel.WRITE, waitCall, responseSink);
260       lockManager.start();
261
262       LockResponseContext ctxt = (LockResponseContext) responseSink.waitForAdd(waitTime * 3);
263       assertTrue(ctxt != null);
264       assertTrue(System.currentTimeMillis() - t0 >= waitTime);
265       assertResponseContext(lockID1, channel1, tx1, LockLevel.WRITE, ctxt);
266       assertTrue(ctxt.isLockWaitTimeout());
267       ThreadUtil.reallySleep(waitTime * 3);
268       assertEquals(3, responseSink.size()); // 2 wait timeouts and 1 award
269
ctxt = (LockResponseContext) responseSink.take();
270       LockResponseContext ctxt1 = (LockResponseContext) responseSink.take();
271       LockResponseContext ctxt2 = (LockResponseContext) responseSink.take();
272       assertTrue((ctxt1.isLockAward() && ctxt2.isLockWaitTimeout())
273                  || (ctxt2.isLockAward() && ctxt1.isLockWaitTimeout()));
274
275     } finally {
276       lockManager = null;
277       resetLockManager();
278     }
279   }
280
281   public void testReestablishLockAfterReestablishWait() throws Exception JavaDoc {
282     LockID lockID1 = new LockID("my lock");
283     ChannelID channel1 = new ChannelID(1);
284     ThreadID tx1 = new ThreadID(1);
285     ThreadID tx2 = new ThreadID(2);
286     int requestedLevel = LockLevel.WRITE;
287     WaitInvocation waitCall = new WaitInvocation();
288     try {
289       TestSink responseSink = new TestSink();
290       assertEquals(0, lockManager.getLockCount());
291       lockManager.reestablishWait(lockID1, channel1, tx1, LockLevel.WRITE, waitCall, responseSink);
292       assertEquals(1, lockManager.getLockCount());
293       assertEquals(0, responseSink.getInternalQueue().size());
294
295       // now try to award the lock to the same client-transaction
296
try {
297         lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink);
298         fail("Should have thrown an AssertionError.");
299       } catch (AssertionError JavaDoc e) {
300         // expected
301
}
302       // now try to reestablish the same lock from a different transaction. It
303
// sould succeed
304
assertEquals(1, lockManager.getLockCount());
305       lockManager.reestablishLock(lockID1, channel1, tx2, requestedLevel, responseSink);
306     } finally {
307       lockManager = null;
308       resetLockManager();
309     }
310   }
311
312   public void testReestablishReadLock() throws Exception JavaDoc {
313     LockID lockID1 = new LockID("my lock");
314     ChannelID channel1 = new ChannelID(1);
315     ThreadID tx1 = new ThreadID(1);
316     ThreadID tx2 = new ThreadID(2);
317     ThreadID tx3 = new ThreadID(3);
318     int requestedLevel = LockLevel.READ;
319
320     try {
321       TestSink responseSink = new TestSink();
322       assertEquals(0, lockManager.getLockCount());
323
324       lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink);
325       assertEquals(1, lockManager.getLockCount());
326
327       // now reestablish the same read lock in another transaction. It should
328
// succeed.
329
lockManager.reestablishLock(lockID1, channel1, tx2, requestedLevel, responseSink);
330       assertEquals(1, lockManager.getLockCount());
331
332       // now reestablish the the same write lock. It should fail.
333
try {
334         lockManager.reestablishLock(lockID1, channel1, tx3, LockLevel.WRITE, responseSink);
335         fail("Should have thrown a LockManagerError.");
336       } catch (AssertionError JavaDoc e) {
337         //
338
}
339
340     } finally {
341       // this needs to be done for tearDown() to pass.
342
lockManager = null;
343       resetLockManager();
344     }
345
346     try {
347       TestSink responseSink = new TestSink();
348       assertEquals(0, lockManager.getLockCount());
349       lockManager.reestablishLock(lockID1, channel1, tx1, LockLevel.WRITE, responseSink);
350       assertEquals(1, lockManager.getLockCount());
351
352       // now reestablish a read lock. This should fail.
353
responseSink = new TestSink();
354       try {
355         lockManager.reestablishLock(lockID1, channel1, tx2, LockLevel.READ, responseSink);
356         fail("Should have thrown a LockManagerError");
357       } catch (AssertionError JavaDoc e) {
358         //
359
}
360
361     } finally {
362       lockManager = null;
363       resetLockManager();
364     }
365   }
366
367   public void testReestablishWriteLock() throws Exception JavaDoc {
368
369     LockID lockID1 = new LockID("my lock");
370     LockID lockID2 = new LockID("my other lock");
371     ChannelID channel1 = new ChannelID(1);
372     ChannelID channel2 = new ChannelID(2);
373     ThreadID tx1 = new ThreadID(1);
374     ThreadID tx2 = new ThreadID(2);
375     int requestedLevel = LockLevel.WRITE;
376
377     try {
378       TestSink responseSink = new TestSink();
379       assertEquals(0, lockManager.getLockCount());
380       lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, responseSink);
381       assertEquals(1, lockManager.getLockCount());
382
383       try {
384         lockManager.reestablishLock(lockID1, channel2, tx2, requestedLevel, responseSink);
385         fail("Expected a LockManagerError!");
386       } catch (AssertionError JavaDoc e) {
387         //
388
}
389
390       // try to reestablish another lock. It should succeed.
391
lockManager.reestablishLock(lockID2, channel1, tx1, requestedLevel, responseSink);
392
393       lockManager.start();
394       // you shouldn't be able to call reestablishLock after the lock manager
395
// has started.
396
try {
397         lockManager.reestablishLock(lockID1, channel1, tx1, requestedLevel, null);
398         fail("Should have thrown a LockManagerError");
399       } catch (Error JavaDoc e) {
400         //
401
}
402
403     } finally {
404       // this needs to be done for tearDown() to pass.
405
lockManager = null;
406       resetLockManager();
407     }
408   }
409
410   // private void assertResponseSink(LockID lockID, ChannelID channel, TransactionID tx, int requestedLevel,
411
// TestSink responseSink) {
412
// assertEquals(1, responseSink.getInternalQueue().size());
413
// LockResponseContext ctxt = (LockResponseContext) responseSink.getInternalQueue().get(0);
414
// assertResponseContext(lockID, channel, tx, requestedLevel, ctxt);
415
// }
416

417   private void assertResponseContext(LockID lockID, ChannelID channel, ThreadID tx1, int requestedLevel,
418                                      LockResponseContext ctxt) {
419     assertEquals(lockID, ctxt.getLockID());
420     assertEquals(channel, ctxt.getChannelID());
421     assertEquals(tx1, ctxt.getThreadID());
422     assertEquals(requestedLevel, ctxt.getLockLevel());
423   }
424
425   public void testWaitTimeoutsIgnoredDuringStartup() throws Exception JavaDoc {
426     LockID lockID = new LockID("my lcok");
427     ChannelID channel1 = new ChannelID(1);
428     ThreadID tx1 = new ThreadID(1);
429     try {
430       long waitTime = 1000;
431       WaitInvocation waitInvocation = new WaitInvocation(waitTime);
432       TestSink responseSink = new TestSink();
433       lockManager.reestablishWait(lockID, channel1, tx1, LockLevel.WRITE, waitInvocation, responseSink);
434
435       LockResponseContext ctxt = (LockResponseContext) responseSink.waitForAdd(waitTime * 2);
436       assertNull(ctxt);
437
438       lockManager.start();
439       ctxt = (LockResponseContext) responseSink.waitForAdd(0);
440       assertNotNull(ctxt);
441     } finally {
442       lockManager = null;
443       resetLockManager();
444     }
445   }
446
447   public void testWaitTimeoutsIgnoredDuringShutdown() throws InterruptedException JavaDoc {
448
449     ChannelID channelID = new ChannelID(1);
450     LockID lockID = new LockID("1");
451     ThreadID txID = new ThreadID(1);
452
453     lockManager.start();
454     boolean granted = lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink);
455     assertTrue(granted);
456
457     lockManager.wait(lockID, channelID, txID, new WaitInvocation(1000), sink);
458     lockManager.stop();
459
460     assertFalse(lockManager.hasPending(lockID));
461     assertEquals(0, lockManager.getLockCount());
462
463     ThreadUtil.reallySleep(1500);
464
465     assertFalse(lockManager.hasPending(lockID));
466     assertEquals(0, lockManager.getLockCount());
467   }
468
469   public void testOffBlocksUntilNoOutstandingLocksViaWait() throws Exception JavaDoc {
470     // this is no longer expected behavior
471
if (true) return;
472     List JavaDoc queue = sink.getInternalQueue();
473     ChannelID channelID = new ChannelID(1);
474     LockID lockID = new LockID("1");
475     ThreadID txID = new ThreadID(1);
476
477     final LinkedQueue shutdownSteps = new LinkedQueue();
478     ShutdownThread shutdown = new ShutdownThread(shutdownSteps);
479
480     try {
481       lockManager.start();
482       lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink);
483       assertEquals(1, queue.size());
484
485       shutdown.start();
486       shutdownSteps.take();
487       ThreadUtil.reallySleep(500);
488       // make sure shutdown didn't complete.
489
assertTrue(shutdownSteps.peek() == null);
490
491       // make sure that waiting on an outstanding lock causes the lock to be
492
// released and allows shutdown to
493
// complete.
494
lockManager.wait(lockID, channelID, new ThreadID(1), new WaitInvocation(), sink);
495       shutdownSteps.take();
496
497     } finally {
498       lockManager.clearAllLocksFor(channelID);
499     }
500   }
501
502   public void testOffDoesNotBlockUntilNoOutstandingLocksViaUnlock() throws Exception JavaDoc {
503     List JavaDoc queue = sink.getInternalQueue();
504     ChannelID channel1 = new ChannelID(1);
505     LockID lock1 = new LockID("1");
506     ThreadID tx1 = new ThreadID(1);
507
508     final LinkedQueue shutdownSteps = new LinkedQueue();
509     ShutdownThread shutdown = new ShutdownThread(shutdownSteps);
510     try {
511       lockManager.start();
512       lockManager.requestLock(lock1, channel1, tx1, LockLevel.WRITE, sink);
513       assertEquals(1, queue.size());
514
515       shutdown.start();
516       shutdownSteps.take();
517       ThreadUtil.reallySleep(1000);
518       shutdownSteps.take();
519     } finally {
520       lockManager = null;
521       resetLockManager();
522     }
523   }
524
525   public void testOffCancelsWaits() throws Exception JavaDoc {
526     // implement me.
527
}
528
529   public void testOffStopsGrantingNewLocks() throws Exception JavaDoc {
530     List JavaDoc queue = sink.getInternalQueue();
531     ChannelID channelID = new ChannelID(1);
532     LockID lockID = new LockID("1");
533     ThreadID txID = new ThreadID(1);
534     try {
535       // Test that the normal case works as expected...
536
lockManager.start();
537       lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink);
538       assertEquals(1, queue.size());
539       queue.clear();
540       lockManager.unlock(lockID, channelID, txID);
541       lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink);
542       assertEquals(1, queue.size());
543       lockManager.unlock(lockID, channelID, txID);
544
545       // Call shutdown and make sure that the lock isn't granted via the
546
// "requestLock" method
547
queue.clear();
548       lockManager.stop();
549       lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink);
550       assertEquals(0, queue.size());
551     } finally {
552       lockManager.clearAllLocksFor(channelID);
553     }
554   }
555
556   public void testRequestDoesntGrantPendingLocks() throws Exception JavaDoc {
557     List JavaDoc queue = sink.getInternalQueue();
558     ChannelID channelID = new ChannelID(1);
559     LockID lockID = new LockID("1");
560     ThreadID txID = new ThreadID(1);
561
562     try {
563       lockManager.start();
564       // now try stacking locks and make sure that calling unlock doesn't grant
565
// the pending locks.
566
lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink);
567       queue.clear();
568       lockManager.requestLock(lockID, channelID, new ThreadID(2), LockLevel.WRITE, sink);
569       // the second lock should be pending.
570
assertEquals(0, queue.size());
571     } finally {
572       lockManager = null;
573       resetLockManager();
574     }
575   }
576
577   public void testUnlockIgnoredDuringShutdown() throws Exception JavaDoc {
578     List JavaDoc queue = sink.getInternalQueue();
579     ChannelID channelID = new ChannelID(1);
580     LockID lockID = new LockID("1");
581     ThreadID txID = new ThreadID(1);
582     try {
583       lockManager.start();
584       // now try stacking locks and make sure that calling unlock doesn't grant
585
// the pending locks.
586
lockManager.requestLock(lockID, channelID, txID, LockLevel.WRITE, sink);
587       queue.clear();
588       lockManager.requestLock(lockID, channelID, new ThreadID(2), LockLevel.WRITE, sink);
589       // the second lock should be pending.
590
assertEquals(0, queue.size());
591
592       lockManager.stop();
593
594       // unlock the first lock
595
lockManager.unlock(lockID, channelID, txID);
596       // the second lock should still be pending
597
assertEquals(0, queue.size());
598
599     } finally {
600       lockManager = null;
601       resetLockManager();
602     }
603   }
604
605   public void testLockManagerBasics() {
606     LockID l1 = new LockID("1");
607     ChannelID c1 = new ChannelID(1);
608     ThreadID s1 = new ThreadID(0);
609
610     ChannelID c2 = new ChannelID(2);
611     ChannelID c3 = new ChannelID(3);
612     ChannelID c4 = new ChannelID(4);
613     lockManager.start();
614     lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink);
615     assertTrue(sink.size() == 1);
616     System.out.println(sink.getInternalQueue().remove(0));
617
618     lockManager.requestLock(l1, c2, s1, LockLevel.WRITE, sink);
619     assertTrue(sink.size() == 0);
620     lockManager.unlock(l1, c1, s1);
621     assertTrue(sink.size() == 1);
622     System.out.println(sink.getInternalQueue().remove(0));
623
624     lockManager.requestLock(l1, c3, s1, LockLevel.READ, sink);
625     assertTrue(sink.size() == 0);
626     assertTrue(lockManager.hasPending(l1));
627     lockManager.unlock(l1, c2, s1);
628     assertTrue(sink.size() == 1);
629     assertFalse(lockManager.hasPending(l1));
630
631     lockManager.requestLock(l1, c4, s1, LockLevel.WRITE, sink);
632     assertTrue(lockManager.hasPending(l1));
633     lockManager.unlock(l1, c2, s1);
634     assertTrue(lockManager.hasPending(l1));
635     lockManager.unlock(l1, c3, s1);
636     assertFalse(lockManager.hasPending(l1));
637     lockManager.unlock(l1, c4, s1);
638   }
639
640   public void testDeadLock1() {
641     // A simple deadlock. Thread 1 holds lock1, wants lock2. Thread2 holds
642
// lock2, wants lock1
643

644     LockID l1 = new LockID("1");
645     LockID l2 = new LockID("2");
646     ChannelID c1 = new ChannelID(1);
647
648     ThreadID s1 = new ThreadID(1);
649     ThreadID s2 = new ThreadID(2);
650
651     ServerThreadID thread1 = new ServerThreadID(c1, s1);
652     ServerThreadID thread2 = new ServerThreadID(c1, s2);
653
654     lockManager.start();
655     // thread1 gets lock1
656
lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink);
657     // thread2 gets lock2
658
lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink);
659     // thread1 trys to get lock2 (blocks)
660
lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink);
661     // thread2 trys to get lock1 (blocks)
662
lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink);
663
664     TestDeadlockResults deadlocks = new TestDeadlockResults();
665     lockManager.scanForDeadlocks(deadlocks);
666
667     assertEquals(1, deadlocks.chains.size());
668     Map JavaDoc check = new HashMap JavaDoc();
669     check.put(thread1, l2);
670     check.put(thread2, l1);
671     assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check);
672
673     // test the mgmt interface too
674
DeadlockChain[] results = lockManager.scanForDeadlocks();
675     assertEquals(1, results.length);
676     check = new HashMap JavaDoc();
677     check.put(thread1, l2);
678     check.put(thread2, l1);
679     assertSpecificDeadlock(results[0], check);
680
681     lockManager.clearAllLocksFor(c1);
682   }
683
684   public void testDeadLock3() {
685     // test that includes locks with more than 1 holder
686

687     // contended locks
688
LockID l1 = new LockID("1");
689     LockID l2 = new LockID("2");
690
691     // uncontended read locks
692
LockID l3 = new LockID("3");
693     LockID l4 = new LockID("4");
694     LockID l5 = new LockID("5");
695
696     ChannelID c1 = new ChannelID(1);
697     ThreadID s1 = new ThreadID(1);
698     ThreadID s2 = new ThreadID(2);
699
700     ServerThreadID thread1 = new ServerThreadID(c1, s1);
701     ServerThreadID thread2 = new ServerThreadID(c1, s2);
702
703     lockManager.start();
704
705     // thread1 holds all three read locks, thread2 has 2 of them
706
lockManager.requestLock(l3, c1, s1, LockLevel.READ, sink);
707     lockManager.requestLock(l4, c1, s1, LockLevel.READ, sink);
708     lockManager.requestLock(l5, c1, s1, LockLevel.READ, sink);
709     lockManager.requestLock(l3, c1, s2, LockLevel.READ, sink);
710     lockManager.requestLock(l4, c1, s2, LockLevel.READ, sink);
711
712     // thread1 gets lock1
713
lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink);
714     // thread2 gets lock2
715
lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink);
716     // thread1 trys to get lock2 (blocks)
717
lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink);
718     // thread2 trys to get lock1 (blocks)
719
lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink);
720
721     TestDeadlockResults deadlocks = new TestDeadlockResults();
722     lockManager.scanForDeadlocks(deadlocks);
723
724     assertEquals(1, deadlocks.chains.size());
725     Map JavaDoc check = new HashMap JavaDoc();
726     check.put(thread1, l2);
727     check.put(thread2, l1);
728     assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check);
729
730     lockManager.clearAllLocksFor(c1);
731   }
732
733   public void testUpgradeDeadLock() {
734     // Detect deadlock in competing upgrades
735
LockID l1 = new LockID("L1");
736
737     ChannelID c0 = new ChannelID(0);
738     ThreadID s1 = new ThreadID(1);
739     ThreadID s2 = new ThreadID(2);
740
741     ServerThreadID thread1 = new ServerThreadID(c0, s1);
742     ServerThreadID thread2 = new ServerThreadID(c0, s2);
743
744     lockManager.start();
745
746     // thread1 gets lock1 (R)
747
lockManager.requestLock(l1, c0, s1, LockLevel.READ, sink);
748     // thread2 gets lock1 (R)
749
lockManager.requestLock(l1, c0, s2, LockLevel.READ, sink);
750
751     // thread1 requests upgrade
752
lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink);
753     // thread2 requests upgrade
754
lockManager.requestLock(l1, c0, s2, LockLevel.WRITE, sink);
755
756     TestDeadlockResults deadlocks = new TestDeadlockResults();
757     lockManager.scanForDeadlocks(deadlocks);
758
759     assertEquals(1, deadlocks.chains.size());
760
761     Map JavaDoc check = new HashMap JavaDoc();
762     check.put(thread1, l1);
763     check.put(thread2, l1);
764     assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check);
765
766     lockManager.clearAllLocksFor(c0);
767   }
768
769   public void testLackOfDeadlock() throws InterruptedException JavaDoc {
770     lockManager.start();
771     for (int i = 0; i < 500; i++) {
772       internalTestLackofDeadlock(false);
773       resetLockManager(true);
774       internalTestLackofDeadlock(true);
775       resetLockManager(true);
776     }
777   }
778
779   private void internalTestLackofDeadlock(boolean useRealThreads) throws InterruptedException JavaDoc {
780     List JavaDoc threads = new ArrayList JavaDoc();
781
782     for (int t = 0; t < numThreads; t++) {
783       ChannelID cid = txns[t].getChannelID();
784       ThreadID tid = txns[t].getClientThreadID();
785
786       RandomRequest req = new RandomRequest(cid, tid);
787       if (useRealThreads) {
788         Thread JavaDoc thread = new Thread JavaDoc(req);
789         thread.start();
790         threads.add(thread);
791       } else {
792         req.run();
793       }
794     }
795
796     if (useRealThreads) {
797       for (Iterator JavaDoc iter = threads.iterator(); iter.hasNext();) {
798         Thread JavaDoc t = (Thread JavaDoc) iter.next();
799         t.join();
800       }
801     }
802
803     TestDeadlockResults results = new TestDeadlockResults();
804     lockManager.scanForDeadlocks(results);
805
806     assertEquals(0, results.chains.size());
807
808     for (int i = 0; i < txns.length; i++) {
809       lockManager.clearAllLocksFor(txns[i].getChannelID());
810     }
811   }
812
813   private class RandomRequest implements Runnable JavaDoc {
814     private final ChannelID cid;
815     private final ThreadID tid;
816
817     public RandomRequest(ChannelID cid, ThreadID tid) {
818       this.cid = cid;
819       this.tid = tid;
820     }
821
822     public void run() {
823       final int start = random.nextInt(numLocks);
824       final int howMany = random.nextInt(numLocks - start);
825
826       for (int i = 0; i < howMany; i++) {
827         LockID lock = locks[start + i];
828         boolean read = random.nextInt(10) < 8; // 80% reads
829
int level = read ? LockLevel.READ : LockLevel.WRITE;
830         boolean granted = lockManager.requestLock(lock, cid, tid, level, sink);
831         if (!granted) {
832           break;
833         }
834       }
835     }
836   }
837
838   private ServerThreadID[] makeUniqueTxns(int num) {
839     ServerThreadID[] rv = new ServerThreadID[num];
840     for (int i = 0; i < num; i++) {
841       rv[i] = new ServerThreadID(new ChannelID(i), new ThreadID(i));
842     }
843     return rv;
844   }
845
846   private LockID[] makeUniqueLocks(int num) {
847     LockID[] rv = new LockID[num];
848     for (int i = 0; i < num; i++) {
849       rv[i] = new LockID("lock-" + i);
850     }
851
852     return rv;
853   }
854
855   private void assertSpecificDeadlock(DeadlockChain chain, Map JavaDoc check) {
856     DeadlockChain start = chain;
857     do {
858       LockID lock = (LockID) check.remove(chain.getWaiter());
859       assertEquals(lock, chain.getWaitingOn());
860       chain = chain.getNextLink();
861     } while (chain != start);
862
863     assertEquals(0, check.size());
864   }
865
866   public void testDeadLock2() {
867     // A slightly more complicated deadlock:
868
// -- Thread1 holds lock1, wants lock2
869
// -- Thread2 holds lock2, wants lock3
870
// -- Thread3 holds lock3, wants lock1
871

872     LockID l1 = new LockID("L1");
873     LockID l2 = new LockID("L2");
874     LockID l3 = new LockID("L3");
875     ChannelID c0 = new ChannelID(0);
876     ThreadID s1 = new ThreadID(1);
877     ThreadID s2 = new ThreadID(2);
878     ThreadID s3 = new ThreadID(3);
879
880     ServerThreadID thread1 = new ServerThreadID(c0, s1);
881     ServerThreadID thread2 = new ServerThreadID(c0, s2);
882     ServerThreadID thread3 = new ServerThreadID(c0, s3);
883
884     lockManager.start();
885
886     // thread1 gets lock1
887
lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink);
888     // thread2 gets lock2
889
lockManager.requestLock(l2, c0, s2, LockLevel.WRITE, sink);
890     // thread3 gets lock3
891
lockManager.requestLock(l3, c0, s3, LockLevel.WRITE, sink);
892
893     // thread1 trys to get lock2 (blocks)
894
lockManager.requestLock(l2, c0, s1, LockLevel.WRITE, sink);
895     // thread2 trys to get lock3 (blocks)
896
lockManager.requestLock(l3, c0, s2, LockLevel.WRITE, sink);
897     // thread3 trys to get lock1 (blocks)
898
lockManager.requestLock(l1, c0, s3, LockLevel.WRITE, sink);
899
900     TestDeadlockResults deadlocks = new TestDeadlockResults();
901     lockManager.scanForDeadlocks(deadlocks);
902
903     assertEquals(1, deadlocks.chains.size());
904
905     Map JavaDoc check = new HashMap JavaDoc();
906     check.put(thread1, l2);
907     check.put(thread2, l3);
908     check.put(thread3, l1);
909     assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0), check);
910
911     lockManager.clearAllLocksFor(c0);
912   }
913
914   private class ShutdownThread extends Thread JavaDoc {
915     private final LinkedQueue shutdownSteps;
916
917     private ShutdownThread(LinkedQueue shutdownSteps) {
918       this.shutdownSteps = shutdownSteps;
919     }
920
921     public void run() {
922       try {
923         shutdownSteps.put(new Object JavaDoc());
924         lockManager.stop();
925         shutdownSteps.put(new Object JavaDoc());
926       } catch (Exception JavaDoc e) {
927         e.printStackTrace();
928         fail();
929       }
930     }
931   }
932
933   private static class TestMessageChannel implements MessageChannel {
934
935     public TCSocketAddress getLocalAddress() {
936       throw new ImplementMe();
937     }
938
939     public TCSocketAddress getRemoteAddress() {
940       try {
941         return new TCSocketAddress("127.0.0.1", 6969);
942       } catch (UnknownHostException JavaDoc e) {
943         throw new RuntimeException JavaDoc(e);
944       }
945     }
946
947     public void addListener(ChannelEventListener listener) {
948       throw new ImplementMe();
949     }
950
951     public ChannelID getChannelID() {
952       throw new ImplementMe();
953     }
954
955     public boolean isOpen() {
956       throw new ImplementMe();
957     }
958
959     public boolean isClosed() {
960       throw new ImplementMe();
961     }
962
963     public TCMessage createMessage(TCMessageType type) {
964       throw new ImplementMe();
965     }
966
967     public Object JavaDoc getAttachment(String JavaDoc key) {
968       throw new ImplementMe();
969     }
970
971     public void addAttachment(String JavaDoc key, Object JavaDoc value, boolean replace) {
972       throw new ImplementMe();
973     }
974
975     public Object JavaDoc removeAttachment(String JavaDoc key) {
976       throw new ImplementMe();
977     }
978
979     public boolean isConnected() {
980       throw new ImplementMe();
981     }
982
983     public void send(TCNetworkMessage message) {
984       throw new ImplementMe();
985     }
986
987     public NetworkStackID open() {
988       throw new ImplementMe();
989     }
990
991     public void close() {
992       throw new ImplementMe();
993     }
994
995   }
996
997   private static class TestDeadlockResults implements DeadlockResults {
998     final List JavaDoc chains = new ArrayList JavaDoc();
999
1000    public void foundDeadlock(DeadlockChain chain) {
1001      chains.add(chain);
1002    }
1003  }
1004
1005}
1006
Popular Tags