KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > tx > RemoteTransactionManagerTest


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object.tx;
6
7 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
8 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
9 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
10 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
11
12 import com.tc.async.api.Sink;
13 import com.tc.bytes.TCByteBuffer;
14 import com.tc.exception.ImplementMe;
15 import com.tc.exception.TCRuntimeException;
16 import com.tc.logging.TCLogger;
17 import com.tc.logging.TCLogging;
18 import com.tc.net.protocol.tcm.ChannelEventListener;
19 import com.tc.net.protocol.tcm.ChannelIDProvider;
20 import com.tc.net.protocol.tcm.ClientMessageChannel;
21 import com.tc.net.protocol.tcm.TCMessageType;
22 import com.tc.object.MockTCObject;
23 import com.tc.object.ObjectID;
24 import com.tc.object.lockmanager.api.LockID;
25 import com.tc.object.logging.NullRuntimeLogger;
26 import com.tc.object.msg.AcknowledgeTransactionMessageFactory;
27 import com.tc.object.msg.ClientHandshakeMessageFactory;
28 import com.tc.object.msg.CommitTransactionMessageFactory;
29 import com.tc.object.msg.JMXMessage;
30 import com.tc.object.msg.LockRequestMessageFactory;
31 import com.tc.object.msg.ObjectIDBatchRequestMessageFactory;
32 import com.tc.object.msg.RequestManagedObjectMessageFactory;
33 import com.tc.object.msg.RequestRootMessageFactory;
34 import com.tc.object.net.DSOClientMessageChannel;
35 import com.tc.object.session.NullSessionManager;
36 import com.tc.object.session.SessionID;
37 import com.tc.util.SequenceID;
38 import com.tc.util.concurrent.NoExceptionLinkedQueue;
39 import com.tc.util.concurrent.ThreadUtil;
40
41 import java.util.ArrayList JavaDoc;
42 import java.util.Collection JavaDoc;
43 import java.util.HashMap JavaDoc;
44 import java.util.HashSet JavaDoc;
45 import java.util.Iterator JavaDoc;
46 import java.util.LinkedList JavaDoc;
47 import java.util.List JavaDoc;
48 import java.util.Map JavaDoc;
49 import java.util.Set JavaDoc;
50
51 import junit.framework.TestCase;
52
53 public class RemoteTransactionManagerTest extends TestCase {
54
55   private static final TCLogger logger = TCLogging.getLogger(RemoteTransactionManagerTest.class);
56
57   private RemoteTransactionManagerImpl manager;
58   private TestTransactionBatchFactory batchFactory;
59   private SynchronizedInt number;
60   private SynchronizedRef error;
61   private Map JavaDoc threads;
62   private LinkedQueue batchSendQueue;
63   private TransactionBatchAccounting batchAccounting;
64   private LockAccounting lockAccounting;
65
66   public void setUp() throws Exception JavaDoc {
67     batchFactory = new TestTransactionBatchFactory();
68     batchAccounting = new TransactionBatchAccounting();
69     lockAccounting = new LockAccounting();
70     manager = new RemoteTransactionManagerImpl(logger, batchFactory, batchAccounting, lockAccounting,
71                                                new NullSessionManager(), new MockChannel());
72     number = new SynchronizedInt(0);
73     error = new SynchronizedRef(null);
74     threads = new HashMap JavaDoc();
75     batchSendQueue = new LinkedQueue();
76   }
77
78   public void tearDown() throws Exception JavaDoc {
79     if (error.get() != null) {
80       Throwable JavaDoc t = (Throwable JavaDoc) error.get();
81       fail(t.getMessage());
82     }
83     for (Iterator JavaDoc i = batchAccounting.addIncompleteTransactionIDsTo(new LinkedList JavaDoc()).iterator(); i.hasNext();) {
84       TransactionID txID = (TransactionID) i.next();
85       manager.receivedAcknowledgement(SessionID.NULL_ID, txID);
86     }
87     batchAccounting.clear();
88     batchAccounting.stop();
89     manager.clear();
90   }
91
92   public void testFlush() throws Exception JavaDoc {
93     final LockID lockID1 = new LockID("lock1");
94     manager.flush(lockID1);
95     TestClientTransaction tx1 = new TestClientTransaction();
96     tx1.txID = new TransactionID(1);
97     tx1.lockID = lockID1;
98     tx1.allLockIDs.add(lockID1);
99     tx1.txnType = TxnType.NORMAL;
100     manager.commit(tx1);
101     final NoExceptionLinkedQueue flushCalls = new NoExceptionLinkedQueue();
102     Runnable JavaDoc flusher = new Runnable JavaDoc() {
103       public void run() {
104         manager.flush(lockID1);
105         flushCalls.put(lockID1);
106       }
107     };
108
109     new Thread JavaDoc(flusher).start();
110     // XXX: Figure out how to do this without a timeout.
111
int timeout = 5 * 1000;
112     System.err.println("About too wait for " + timeout + " ms.");
113     assertNull(flushCalls.poll(timeout));
114
115     manager.receivedAcknowledgement(SessionID.NULL_ID, tx1.getTransactionID());
116     assertEquals(lockID1, flushCalls.take());
117
118     TestClientTransaction tx2 = tx1;
119     tx2.txID = new TransactionID(2);
120
121     // make sure flush falls through if the acknowledgement is received before the flush is called.
122
manager.commit(tx1);
123     manager.receivedAcknowledgement(SessionID.NULL_ID, tx2.getTransactionID());
124     new Thread JavaDoc(flusher).start();
125     assertEquals(lockID1, flushCalls.take());
126   }
127
128   public void testSendAckedGlobalTransactionIDs() throws Exception JavaDoc {
129     assertTrue(batchSendQueue.isEmpty());
130     Set JavaDoc acknowledged = new HashSet JavaDoc();
131     ClientTransaction ctx = makeTransaction();
132     CyclicBarrier barrier = new CyclicBarrier(2);
133
134     callCommitOnThread(ctx, barrier);
135     barrier.barrier();
136     ThreadUtil.reallySleep(500);
137     TestTransactionBatch batch = (TestTransactionBatch) batchFactory.newBatchQueue.poll(1);
138     assertNotNull(batch);
139     assertSame(batch, batchSendQueue.poll(1));
140     assertTrue(batchSendQueue.isEmpty());
141
142     assertEquals(acknowledged, batch.ackedTransactions);
143
144     // fill the current batch with a bunch of transactions
145
int count = 10;
146     for (int i = 0; i < count; i++) {
147       ClientTransaction ctx1 = makeTransaction();
148       barrier = new CyclicBarrier(2);
149       callCommitOnThread(ctx1, barrier);
150       barrier.barrier();
151       ThreadUtil.reallySleep(500);
152     }
153
154     List JavaDoc batches = new ArrayList JavaDoc();
155     TestTransactionBatch batch1;
156     while ((batch1 = (TestTransactionBatch) batchSendQueue.poll(3000)) != null) {
157       System.err.println("Recd batch " + batch1);
158       batches.add(batch1);
159     }
160
161     // acknowledge the first transaction
162
manager.receivedAcknowledgement(SessionID.NULL_ID, ctx.getTransactionID());
163     acknowledged.add(ctx.getTransactionID());
164
165     manager.receivedBatchAcknowledgement(batch.batchID);
166
167     // the batch ack should have sent another batch
168
batch = (TestTransactionBatch) batchSendQueue.poll(1);
169     assertNotNull(batch);
170     assertTrue(batchSendQueue.isEmpty());
171
172     // The set of acked transactions in the batch should be everything we've
173
// acknowledged so far.
174
assertEquals(acknowledged, batch.ackedTransactions);
175
176     acknowledged.clear();
177
178     ctx = makeTransaction();
179     barrier = new CyclicBarrier(2);
180     callCommitOnThread(ctx, barrier);
181     barrier.barrier();
182     ThreadUtil.reallySleep(500);
183
184     // acknowledge the remaining batches so the current batch will get sent.
185
for (Iterator JavaDoc i = batches.iterator(); i.hasNext();) {
186       batch1 = (TestTransactionBatch) i.next();
187       manager.receivedBatchAcknowledgement(batch1.batchID);
188     }
189     manager.receivedBatchAcknowledgement(batch.batchID);
190
191     batch = (TestTransactionBatch) batchSendQueue.poll(1);
192     assertNotNull(batch);
193     assertTrue(batchSendQueue.isEmpty());
194     // The set of acked transactions should now be empty, since we've already
195
// sent them down to the server
196
assertEquals(acknowledged, batch.ackedTransactions);
197   }
198
199   public void testResendOutstandingBasics() throws Exception JavaDoc {
200     System.err.println("Testing testResendOutstandingBasics ...");
201     final Set JavaDoc batchTxs = new HashSet JavaDoc();
202
203     final int maxBatchesOutstanding = manager.getMaxOutStandingBatches();
204     final List JavaDoc batches = new ArrayList JavaDoc();
205
206     TestTransactionBatch batchN;
207     for (int i = 0; i < maxBatchesOutstanding; i++) {
208       makeAndCommitTransactions(batchTxs, 1);
209       batchN = (TestTransactionBatch) batchSendQueue.take();
210       System.err.println("* Recd " + batchN);
211       assertEquals(batchN, getNextNewBatch());
212       assertTrue(batchSendQueue.isEmpty());
213       batches.add(batchN);
214       assertEquals(1, batchN.transactions.size());
215     }
216
217     final int num = 5;
218     // These txns are not gonna be sent as we already reached the max Batches outstanding count
219
makeAndCommitTransactions(batchTxs, num);
220     ThreadUtil.reallySleep(2000);
221     assertTrue(batchSendQueue.isEmpty());
222
223     // Resend outstanding batches
224
restart(manager);
225
226     // Make sure the batches get resent
227
for (int i = batches.size(); i > 0; i--) {
228       assertTrue(batches.contains(batchSendQueue.take()));
229     }
230     assertTrue(batchSendQueue.isEmpty());
231
232     // ACK batch 1; next batch (batch 3) will be sent.
233
manager.receivedBatchAcknowledgement(((TestTransactionBatch) batches.get(0)).batchID);
234     while ((batchN = (TestTransactionBatch) batchSendQueue.poll(3000)) != null) {
235       System.err.println("** Recd " + batchN);
236       batches.add(batchN);
237       getNextNewBatch();
238     }
239
240     // Resend outstanding batches
241
restart(manager);
242
243     // This time, all batches + batch 3 should get resent
244
List JavaDoc sent = (List JavaDoc) drainQueueInto(batchSendQueue, new LinkedList JavaDoc());
245     assertEquals(batches.size(), sent.size());
246     assertTrue(sent.containsAll(batches));
247
248     // make some new transactions that should go into next batch
249
makeAndCommitTransactions(batchTxs, num);
250
251     // ACK all the transactions in batch1
252
Collection JavaDoc batch1Txs = ((TestTransactionBatch) batches.get(0)).addTransactionIDsTo(new HashSet JavaDoc());
253     for (Iterator JavaDoc i = batch1Txs.iterator(); i.hasNext();) {
254       TransactionID txnId = (TransactionID) i.next();
255       batchTxs.remove(txnId);
256       manager.receivedAcknowledgement(SessionID.NULL_ID, txnId);
257     }
258     batches.remove(0);
259
260     // resend
261
restart(manager);
262
263     // This time, batches except batch 1 should get resent
264
sent = (List JavaDoc) drainQueueInto(batchSendQueue, new LinkedList JavaDoc());
265     assertEquals(batches.size(), sent.size());
266     assertTrue(sent.containsAll(batches));
267
268     // ACK all other batches
269
for (Iterator JavaDoc i = batches.iterator(); i.hasNext();) {
270       batchN = (TestTransactionBatch) i.next();
271       manager.receivedBatchAcknowledgement(batchN.batchID);
272     }
273
274     while ((batchN = (TestTransactionBatch) batchSendQueue.poll(3000)) != null) {
275       System.err.println("*** Recd " + batchN);
276       batches.add(batchN);
277       getNextNewBatch();
278     }
279
280     // resend
281
restart(manager);
282
283     // This time, batches except batch 1 should get resent
284
sent = (List JavaDoc) drainQueueInto(batchSendQueue, new LinkedList JavaDoc());
285     assertEquals(batches.size(), sent.size());
286     assertTrue(sent.containsAll(batches));
287
288     // now make sure that the manager re-sends an outstanding batch until all of
289
// its transactions have been acked.
290
while (batches.size() > 0) {
291       Collection JavaDoc batchNTxs = ((TestTransactionBatch) batches.get(0)).addTransactionIDsTo(new HashSet JavaDoc());
292       for (Iterator JavaDoc i = batchNTxs.iterator(); i.hasNext();) {
293         TransactionID txnId = (TransactionID) i.next();
294         batchTxs.remove(txnId);
295         manager.receivedAcknowledgement(SessionID.NULL_ID, txnId);
296         restart(manager);
297         sent = (List JavaDoc) drainQueueInto(batchSendQueue, new LinkedList JavaDoc());
298         if (i.hasNext()) {
299           // There are still un-ACKed transactions in this batch.
300
assertEquals(batches.size(), sent.size());
301           assertTrue(batches.containsAll(sent));
302         } else {
303           // all the transactions have been ACKed, so current batch (batch 4 should be sent)
304
batches.remove(0);
305           assertEquals(batches.size(), sent.size());
306           assertTrue(batches.containsAll(sent));
307         }
308       }
309     }
310   }
311
312   private void restart(RemoteTransactionManager manager2) {
313     manager2.pause();
314     manager2.starting();
315     manager2.resendOutstandingAndUnpause();
316
317   }
318
319   private void makeAndCommitTransactions(final Set JavaDoc created, final int count) throws InterruptedException JavaDoc {
320     CyclicBarrier commitBarrier = new CyclicBarrier(count + 1);
321     for (int i = 0; i < count; i++) {
322       ClientTransaction tx = makeTransaction();
323       created.add(tx);
324       callCommitOnThread(tx, commitBarrier);
325     }
326     // make sure all the threads have at least started...
327
commitBarrier.barrier();
328     // sleep a little bit to make sure they get to the commit() call.
329
ThreadUtil.reallySleep(1000);
330   }
331
332   public void testBatching() throws InterruptedException JavaDoc {
333
334     System.err.println("Testing testBatching ...");
335
336     final int maxBatchesOutstanding = manager.getMaxOutStandingBatches();
337     TestTransactionBatch batchN;
338     final Set JavaDoc batchTxs = new HashSet JavaDoc();
339     final List JavaDoc batches = new ArrayList JavaDoc();
340
341     for (int i = 0; i < maxBatchesOutstanding; i++) {
342       makeAndCommitTransactions(batchTxs, 1);
343       batchN = (TestTransactionBatch) batchSendQueue.take();
344       System.err.println("* Recd " + batchN);
345       assertEquals(batchN, getNextNewBatch());
346       assertTrue(batchSendQueue.isEmpty());
347       batches.add(batchN);
348       assertEquals(1, batchN.transactions.size());
349       assertTrue(batchTxs.containsAll(batchN.transactions));
350     }
351
352     final int num = 10;
353
354     // create more transactions on the client side (they should all get batched
355
// locally)
356
List JavaDoc batch2Txs = new ArrayList JavaDoc();
357     CyclicBarrier barrier = new CyclicBarrier(num + 1);
358     for (int i = 1; i <= num; i++) {
359       ClientTransaction txn = makeTransaction();
360       batch2Txs.add(txn);
361       callCommitOnThread(txn, barrier);
362     }
363     batchTxs.addAll(batch2Txs);
364
365     barrier.barrier();
366     assertFalse(barrier.broken());
367     ThreadUtil.reallySleep(2000);
368     assertTrue(batchSendQueue.isEmpty());
369
370     // Make sure the rest transactions get into the second batch
371
TestTransactionBatch batch2 = getNextNewBatch();
372     Collection JavaDoc txnsInBatch = drainQueueInto(batch2.addTxQueue, new HashSet JavaDoc());
373     assertTrue(batch2Txs.size() == txnsInBatch.size());
374     txnsInBatch.removeAll(batch2Txs);
375     assertTrue(txnsInBatch.size() == 0);
376     assertTrue(batch2.addTxQueue.isEmpty());
377
378     TestTransactionBatch batch1 = ((TestTransactionBatch) batches.remove(0));
379
380     // ACK one of the batch (triggers send of next batch)
381
manager.receivedBatchAcknowledgement(batch1.batchID);
382     // make sure that the batch sent is what we expected.
383
assertSame(batch2, batchSendQueue.take());
384
385     TestTransactionBatch batch3 = getNextNewBatch();
386
387     // ACK another batch (no more TXNs to send this time)
388
assertTrue(batchSendQueue.isEmpty());
389     manager.receivedBatchAcknowledgement(batch2.batchID);
390     assertTrue(batchSendQueue.isEmpty());
391     for (Iterator JavaDoc i = batches.iterator(); i.hasNext();) {
392       TestTransactionBatch b = (TestTransactionBatch) i.next();
393       manager.receivedBatchAcknowledgement(b.batchID);
394       assertTrue(batchSendQueue.isEmpty());
395     }
396
397     for (Iterator JavaDoc i = batchTxs.iterator(); i.hasNext();) {
398       ClientTransaction txn = (ClientTransaction) i.next();
399       manager.receivedAcknowledgement(SessionID.NULL_ID, txn.getTransactionID());
400       assertTrue(batchSendQueue.isEmpty());
401     }
402
403     // There should still be no batch to send.
404
assertTrue(batchSendQueue.isEmpty());
405     assertTrue(drainQueueInto(batch3.addTxQueue, new LinkedList JavaDoc()).isEmpty());
406   }
407
408   private Collection JavaDoc drainQueueInto(LinkedQueue queue, Collection JavaDoc dest) throws InterruptedException JavaDoc {
409     while (!queue.isEmpty()) {
410       dest.add(queue.take());
411     }
412     return dest;
413   }
414
415   private TestTransactionBatch getNextNewBatch() throws InterruptedException JavaDoc {
416     TestTransactionBatch rv = (TestTransactionBatch) batchFactory.newBatchQueue.take();
417     return rv;
418   }
419
420   private synchronized void callCommitOnThread(final ClientTransaction txn, final CyclicBarrier barrier) {
421     TransactionID txnID = txn.getTransactionID();
422
423     Thread JavaDoc t = new Thread JavaDoc("Commit for txn #" + txnID.toLong()) {
424       public void run() {
425         try {
426           barrier.barrier();
427           manager.commit(txn);
428         } catch (Throwable JavaDoc th) {
429           th.printStackTrace();
430           error.set(th);
431         }
432       }
433     };
434
435     threads.put(txnID, t);
436     t.start();
437   }
438
439   private ClientTransaction makeTransaction() {
440     int num = number.increment();
441     LockID lid = new LockID("lock" + num);
442     TransactionContext tc = new TransactionContext(lid, TxnType.NORMAL, new LockID[] { lid });
443     ClientTransaction txn = new ClientTransactionImpl(new TransactionID(num), new NullRuntimeLogger(), null);
444     txn.setTransactionContext(tc);
445     txn.fieldChanged(new MockTCObject(new ObjectID(num), this), "class", "class.field", new ObjectID(num), -1);
446     return txn;
447   }
448
449   private final class TestTransactionBatch implements ClientTransactionBatch {
450
451     public final Set JavaDoc ackedTransactions = new HashSet JavaDoc();
452
453     public final TxnBatchID batchID;
454
455     public final LinkedQueue addTxQueue = new LinkedQueue();
456     private final LinkedList JavaDoc transactions = new LinkedList JavaDoc();
457
458     public TestTransactionBatch(TxnBatchID batchID) {
459       this.batchID = batchID;
460     }
461
462     public String JavaDoc toString() {
463       return "TestTransactionBatch[" + batchID + "] = Txn [ " + transactions + " ]";
464     }
465
466     public synchronized boolean isEmpty() {
467       return transactions.isEmpty();
468     }
469
470     public int numberOfTxns() {
471       return transactions.size();
472     }
473
474     public boolean isNull() {
475       return false;
476     }
477
478     public synchronized void addTransaction(ClientTransaction txn) {
479       try {
480         addTxQueue.put(txn);
481         transactions.add(txn);
482       } catch (InterruptedException JavaDoc e) {
483         throw new TCRuntimeException(e);
484       }
485     }
486
487     public void removeTransaction(TransactionID txID) {
488       return;
489     }
490
491     public Collection JavaDoc addTransactionIDsTo(Collection JavaDoc c) {
492       for (Iterator JavaDoc i = transactions.iterator(); i.hasNext();) {
493         ClientTransaction txn = (ClientTransaction) i.next();
494         c.add(txn.getTransactionID());
495       }
496       return c;
497     }
498
499     public void send() {
500       try {
501         batchSendQueue.put(this);
502       } catch (InterruptedException JavaDoc e) {
503         throw new TCRuntimeException(e);
504       }
505     }
506
507     public TCByteBuffer[] getData() {
508       return null;
509     }
510
511     public void addAcknowledgedTransactionIDs(Collection JavaDoc acknowledged) {
512       ackedTransactions.addAll(acknowledged);
513     }
514
515     public Collection JavaDoc getAcknowledgedTransactionIDs() {
516       throw new ImplementMe();
517     }
518
519     public TxnBatchID getTransactionBatchID() {
520       return this.batchID;
521     }
522
523     public SequenceID getMinTransactionSequence() {
524       throw new ImplementMe();
525     }
526
527     public void recycle() {
528       return;
529     }
530
531     public Collection JavaDoc addTransactionSequenceIDsTo(Collection JavaDoc sequenceIDs) {
532       for (Iterator JavaDoc i = transactions.iterator(); i.hasNext();) {
533         ClientTransaction txn = (ClientTransaction) i.next();
534         sequenceIDs.add(txn.getSequenceID());
535       }
536       return sequenceIDs;
537     }
538
539     public String JavaDoc dump() {
540       return "TestTransactionBatch";
541     }
542
543     public int byteSize() {
544       return 64000;
545     }
546
547   }
548
549   private final class TestTransactionBatchFactory implements TransactionBatchFactory {
550     private long idSequence;
551     public final LinkedQueue newBatchQueue = new LinkedQueue();
552
553     public synchronized ClientTransactionBatch nextBatch() {
554       ClientTransactionBatch rv = new TestTransactionBatch(new TxnBatchID(++idSequence));
555       try {
556         newBatchQueue.put(rv);
557       } catch (InterruptedException JavaDoc e) {
558         throw new TCRuntimeException(e);
559       }
560       return rv;
561     }
562   }
563
564   private static class MockChannel implements DSOClientMessageChannel {
565
566     public void addClassMapping(TCMessageType messageType, Class JavaDoc messageClass) {
567       throw new ImplementMe();
568     }
569
570     public void addListener(ChannelEventListener listener) {
571       throw new ImplementMe();
572     }
573
574     public ClientMessageChannel channel() {
575       throw new ImplementMe();
576     }
577
578     public void close() {
579       throw new ImplementMe();
580     }
581
582     public AcknowledgeTransactionMessageFactory getAcknowledgeTransactionMessageFactory() {
583       throw new ImplementMe();
584     }
585
586     public ChannelIDProvider getChannelIDProvider() {
587       throw new ImplementMe();
588     }
589
590     public ClientHandshakeMessageFactory getClientHandshakeMessageFactory() {
591       throw new ImplementMe();
592     }
593
594     public CommitTransactionMessageFactory getCommitTransactionMessageFactory() {
595       throw new ImplementMe();
596     }
597
598     public LockRequestMessageFactory getLockRequestMessageFactory() {
599       throw new ImplementMe();
600     }
601
602     public ObjectIDBatchRequestMessageFactory getObjectIDBatchRequestMessageFactory() {
603       throw new ImplementMe();
604     }
605
606     public RequestManagedObjectMessageFactory getRequestManagedObjectMessageFactory() {
607       throw new ImplementMe();
608     }
609
610     public RequestRootMessageFactory getRequestRootMessageFactory() {
611       throw new ImplementMe();
612     }
613
614     public boolean isConnected() {
615       throw new ImplementMe();
616     }
617
618     public void open() {
619       throw new ImplementMe();
620     }
621
622     public void routeMessageType(TCMessageType messageType, Sink destSink, Sink hydrateSink) {
623       throw new ImplementMe();
624     }
625
626     public JMXMessage getJMXMessage() {
627       throw new ImplementMe();
628     }
629   }
630
631 }
632
Popular Tags