1 5 package com.tc.object.tx; 6 7 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier; 8 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 9 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 10 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 11 12 import com.tc.async.api.Sink; 13 import com.tc.bytes.TCByteBuffer; 14 import com.tc.exception.ImplementMe; 15 import com.tc.exception.TCRuntimeException; 16 import com.tc.logging.TCLogger; 17 import com.tc.logging.TCLogging; 18 import com.tc.net.protocol.tcm.ChannelEventListener; 19 import com.tc.net.protocol.tcm.ChannelIDProvider; 20 import com.tc.net.protocol.tcm.ClientMessageChannel; 21 import com.tc.net.protocol.tcm.TCMessageType; 22 import com.tc.object.MockTCObject; 23 import com.tc.object.ObjectID; 24 import com.tc.object.lockmanager.api.LockID; 25 import com.tc.object.logging.NullRuntimeLogger; 26 import com.tc.object.msg.AcknowledgeTransactionMessageFactory; 27 import com.tc.object.msg.ClientHandshakeMessageFactory; 28 import com.tc.object.msg.CommitTransactionMessageFactory; 29 import com.tc.object.msg.JMXMessage; 30 import com.tc.object.msg.LockRequestMessageFactory; 31 import com.tc.object.msg.ObjectIDBatchRequestMessageFactory; 32 import com.tc.object.msg.RequestManagedObjectMessageFactory; 33 import com.tc.object.msg.RequestRootMessageFactory; 34 import com.tc.object.net.DSOClientMessageChannel; 35 import com.tc.object.session.NullSessionManager; 36 import com.tc.object.session.SessionID; 37 import com.tc.util.SequenceID; 38 import com.tc.util.concurrent.NoExceptionLinkedQueue; 39 import com.tc.util.concurrent.ThreadUtil; 40 41 import java.util.ArrayList ; 42 import java.util.Collection ; 43 import java.util.HashMap ; 44 import java.util.HashSet ; 45 import java.util.Iterator ; 46 import java.util.LinkedList ; 47 import java.util.List ; 48 import java.util.Map ; 49 import java.util.Set ; 50 51 import junit.framework.TestCase; 52 53 public class RemoteTransactionManagerTest extends TestCase { 54 55 private static final TCLogger logger = TCLogging.getLogger(RemoteTransactionManagerTest.class); 56 57 private RemoteTransactionManagerImpl manager; 58 private TestTransactionBatchFactory batchFactory; 59 private SynchronizedInt number; 60 private SynchronizedRef error; 61 private Map threads; 62 private LinkedQueue batchSendQueue; 63 private TransactionBatchAccounting batchAccounting; 64 private LockAccounting lockAccounting; 65 66 public void setUp() throws Exception { 67 batchFactory = new TestTransactionBatchFactory(); 68 batchAccounting = new TransactionBatchAccounting(); 69 lockAccounting = new LockAccounting(); 70 manager = new RemoteTransactionManagerImpl(logger, batchFactory, batchAccounting, lockAccounting, 71 new NullSessionManager(), new MockChannel()); 72 number = new SynchronizedInt(0); 73 error = new SynchronizedRef(null); 74 threads = new HashMap (); 75 batchSendQueue = new LinkedQueue(); 76 } 77 78 public void tearDown() throws Exception { 79 if (error.get() != null) { 80 Throwable t = (Throwable ) error.get(); 81 fail(t.getMessage()); 82 } 83 for (Iterator i = batchAccounting.addIncompleteTransactionIDsTo(new LinkedList ()).iterator(); i.hasNext();) { 84 TransactionID txID = (TransactionID) i.next(); 85 manager.receivedAcknowledgement(SessionID.NULL_ID, txID); 86 } 87 batchAccounting.clear(); 88 batchAccounting.stop(); 89 manager.clear(); 90 } 91 92 public void testFlush() throws Exception { 93 final LockID lockID1 = new LockID("lock1"); 94 manager.flush(lockID1); 95 TestClientTransaction tx1 = new TestClientTransaction(); 96 tx1.txID = new TransactionID(1); 97 tx1.lockID = lockID1; 98 tx1.allLockIDs.add(lockID1); 99 tx1.txnType = TxnType.NORMAL; 100 manager.commit(tx1); 101 final NoExceptionLinkedQueue flushCalls = new NoExceptionLinkedQueue(); 102 Runnable flusher = new Runnable () { 103 public void run() { 104 manager.flush(lockID1); 105 flushCalls.put(lockID1); 106 } 107 }; 108 109 new Thread (flusher).start(); 110 int timeout = 5 * 1000; 112 System.err.println("About too wait for " + timeout + " ms."); 113 assertNull(flushCalls.poll(timeout)); 114 115 manager.receivedAcknowledgement(SessionID.NULL_ID, tx1.getTransactionID()); 116 assertEquals(lockID1, flushCalls.take()); 117 118 TestClientTransaction tx2 = tx1; 119 tx2.txID = new TransactionID(2); 120 121 manager.commit(tx1); 123 manager.receivedAcknowledgement(SessionID.NULL_ID, tx2.getTransactionID()); 124 new Thread (flusher).start(); 125 assertEquals(lockID1, flushCalls.take()); 126 } 127 128 public void testSendAckedGlobalTransactionIDs() throws Exception { 129 assertTrue(batchSendQueue.isEmpty()); 130 Set acknowledged = new HashSet (); 131 ClientTransaction ctx = makeTransaction(); 132 CyclicBarrier barrier = new CyclicBarrier(2); 133 134 callCommitOnThread(ctx, barrier); 135 barrier.barrier(); 136 ThreadUtil.reallySleep(500); 137 TestTransactionBatch batch = (TestTransactionBatch) batchFactory.newBatchQueue.poll(1); 138 assertNotNull(batch); 139 assertSame(batch, batchSendQueue.poll(1)); 140 assertTrue(batchSendQueue.isEmpty()); 141 142 assertEquals(acknowledged, batch.ackedTransactions); 143 144 int count = 10; 146 for (int i = 0; i < count; i++) { 147 ClientTransaction ctx1 = makeTransaction(); 148 barrier = new CyclicBarrier(2); 149 callCommitOnThread(ctx1, barrier); 150 barrier.barrier(); 151 ThreadUtil.reallySleep(500); 152 } 153 154 List batches = new ArrayList (); 155 TestTransactionBatch batch1; 156 while ((batch1 = (TestTransactionBatch) batchSendQueue.poll(3000)) != null) { 157 System.err.println("Recd batch " + batch1); 158 batches.add(batch1); 159 } 160 161 manager.receivedAcknowledgement(SessionID.NULL_ID, ctx.getTransactionID()); 163 acknowledged.add(ctx.getTransactionID()); 164 165 manager.receivedBatchAcknowledgement(batch.batchID); 166 167 batch = (TestTransactionBatch) batchSendQueue.poll(1); 169 assertNotNull(batch); 170 assertTrue(batchSendQueue.isEmpty()); 171 172 assertEquals(acknowledged, batch.ackedTransactions); 175 176 acknowledged.clear(); 177 178 ctx = makeTransaction(); 179 barrier = new CyclicBarrier(2); 180 callCommitOnThread(ctx, barrier); 181 barrier.barrier(); 182 ThreadUtil.reallySleep(500); 183 184 for (Iterator i = batches.iterator(); i.hasNext();) { 186 batch1 = (TestTransactionBatch) i.next(); 187 manager.receivedBatchAcknowledgement(batch1.batchID); 188 } 189 manager.receivedBatchAcknowledgement(batch.batchID); 190 191 batch = (TestTransactionBatch) batchSendQueue.poll(1); 192 assertNotNull(batch); 193 assertTrue(batchSendQueue.isEmpty()); 194 assertEquals(acknowledged, batch.ackedTransactions); 197 } 198 199 public void testResendOutstandingBasics() throws Exception { 200 System.err.println("Testing testResendOutstandingBasics ..."); 201 final Set batchTxs = new HashSet (); 202 203 final int maxBatchesOutstanding = manager.getMaxOutStandingBatches(); 204 final List batches = new ArrayList (); 205 206 TestTransactionBatch batchN; 207 for (int i = 0; i < maxBatchesOutstanding; i++) { 208 makeAndCommitTransactions(batchTxs, 1); 209 batchN = (TestTransactionBatch) batchSendQueue.take(); 210 System.err.println("* Recd " + batchN); 211 assertEquals(batchN, getNextNewBatch()); 212 assertTrue(batchSendQueue.isEmpty()); 213 batches.add(batchN); 214 assertEquals(1, batchN.transactions.size()); 215 } 216 217 final int num = 5; 218 makeAndCommitTransactions(batchTxs, num); 220 ThreadUtil.reallySleep(2000); 221 assertTrue(batchSendQueue.isEmpty()); 222 223 restart(manager); 225 226 for (int i = batches.size(); i > 0; i--) { 228 assertTrue(batches.contains(batchSendQueue.take())); 229 } 230 assertTrue(batchSendQueue.isEmpty()); 231 232 manager.receivedBatchAcknowledgement(((TestTransactionBatch) batches.get(0)).batchID); 234 while ((batchN = (TestTransactionBatch) batchSendQueue.poll(3000)) != null) { 235 System.err.println("** Recd " + batchN); 236 batches.add(batchN); 237 getNextNewBatch(); 238 } 239 240 restart(manager); 242 243 List sent = (List ) drainQueueInto(batchSendQueue, new LinkedList ()); 245 assertEquals(batches.size(), sent.size()); 246 assertTrue(sent.containsAll(batches)); 247 248 makeAndCommitTransactions(batchTxs, num); 250 251 Collection batch1Txs = ((TestTransactionBatch) batches.get(0)).addTransactionIDsTo(new HashSet ()); 253 for (Iterator i = batch1Txs.iterator(); i.hasNext();) { 254 TransactionID txnId = (TransactionID) i.next(); 255 batchTxs.remove(txnId); 256 manager.receivedAcknowledgement(SessionID.NULL_ID, txnId); 257 } 258 batches.remove(0); 259 260 restart(manager); 262 263 sent = (List ) drainQueueInto(batchSendQueue, new LinkedList ()); 265 assertEquals(batches.size(), sent.size()); 266 assertTrue(sent.containsAll(batches)); 267 268 for (Iterator i = batches.iterator(); i.hasNext();) { 270 batchN = (TestTransactionBatch) i.next(); 271 manager.receivedBatchAcknowledgement(batchN.batchID); 272 } 273 274 while ((batchN = (TestTransactionBatch) batchSendQueue.poll(3000)) != null) { 275 System.err.println("*** Recd " + batchN); 276 batches.add(batchN); 277 getNextNewBatch(); 278 } 279 280 restart(manager); 282 283 sent = (List ) drainQueueInto(batchSendQueue, new LinkedList ()); 285 assertEquals(batches.size(), sent.size()); 286 assertTrue(sent.containsAll(batches)); 287 288 while (batches.size() > 0) { 291 Collection batchNTxs = ((TestTransactionBatch) batches.get(0)).addTransactionIDsTo(new HashSet ()); 292 for (Iterator i = batchNTxs.iterator(); i.hasNext();) { 293 TransactionID txnId = (TransactionID) i.next(); 294 batchTxs.remove(txnId); 295 manager.receivedAcknowledgement(SessionID.NULL_ID, txnId); 296 restart(manager); 297 sent = (List ) drainQueueInto(batchSendQueue, new LinkedList ()); 298 if (i.hasNext()) { 299 assertEquals(batches.size(), sent.size()); 301 assertTrue(batches.containsAll(sent)); 302 } else { 303 batches.remove(0); 305 assertEquals(batches.size(), sent.size()); 306 assertTrue(batches.containsAll(sent)); 307 } 308 } 309 } 310 } 311 312 private void restart(RemoteTransactionManager manager2) { 313 manager2.pause(); 314 manager2.starting(); 315 manager2.resendOutstandingAndUnpause(); 316 317 } 318 319 private void makeAndCommitTransactions(final Set created, final int count) throws InterruptedException { 320 CyclicBarrier commitBarrier = new CyclicBarrier(count + 1); 321 for (int i = 0; i < count; i++) { 322 ClientTransaction tx = makeTransaction(); 323 created.add(tx); 324 callCommitOnThread(tx, commitBarrier); 325 } 326 commitBarrier.barrier(); 328 ThreadUtil.reallySleep(1000); 330 } 331 332 public void testBatching() throws InterruptedException { 333 334 System.err.println("Testing testBatching ..."); 335 336 final int maxBatchesOutstanding = manager.getMaxOutStandingBatches(); 337 TestTransactionBatch batchN; 338 final Set batchTxs = new HashSet (); 339 final List batches = new ArrayList (); 340 341 for (int i = 0; i < maxBatchesOutstanding; i++) { 342 makeAndCommitTransactions(batchTxs, 1); 343 batchN = (TestTransactionBatch) batchSendQueue.take(); 344 System.err.println("* Recd " + batchN); 345 assertEquals(batchN, getNextNewBatch()); 346 assertTrue(batchSendQueue.isEmpty()); 347 batches.add(batchN); 348 assertEquals(1, batchN.transactions.size()); 349 assertTrue(batchTxs.containsAll(batchN.transactions)); 350 } 351 352 final int num = 10; 353 354 List batch2Txs = new ArrayList (); 357 CyclicBarrier barrier = new CyclicBarrier(num + 1); 358 for (int i = 1; i <= num; i++) { 359 ClientTransaction txn = makeTransaction(); 360 batch2Txs.add(txn); 361 callCommitOnThread(txn, barrier); 362 } 363 batchTxs.addAll(batch2Txs); 364 365 barrier.barrier(); 366 assertFalse(barrier.broken()); 367 ThreadUtil.reallySleep(2000); 368 assertTrue(batchSendQueue.isEmpty()); 369 370 TestTransactionBatch batch2 = getNextNewBatch(); 372 Collection txnsInBatch = drainQueueInto(batch2.addTxQueue, new HashSet ()); 373 assertTrue(batch2Txs.size() == txnsInBatch.size()); 374 txnsInBatch.removeAll(batch2Txs); 375 assertTrue(txnsInBatch.size() == 0); 376 assertTrue(batch2.addTxQueue.isEmpty()); 377 378 TestTransactionBatch batch1 = ((TestTransactionBatch) batches.remove(0)); 379 380 manager.receivedBatchAcknowledgement(batch1.batchID); 382 assertSame(batch2, batchSendQueue.take()); 384 385 TestTransactionBatch batch3 = getNextNewBatch(); 386 387 assertTrue(batchSendQueue.isEmpty()); 389 manager.receivedBatchAcknowledgement(batch2.batchID); 390 assertTrue(batchSendQueue.isEmpty()); 391 for (Iterator i = batches.iterator(); i.hasNext();) { 392 TestTransactionBatch b = (TestTransactionBatch) i.next(); 393 manager.receivedBatchAcknowledgement(b.batchID); 394 assertTrue(batchSendQueue.isEmpty()); 395 } 396 397 for (Iterator i = batchTxs.iterator(); i.hasNext();) { 398 ClientTransaction txn = (ClientTransaction) i.next(); 399 manager.receivedAcknowledgement(SessionID.NULL_ID, txn.getTransactionID()); 400 assertTrue(batchSendQueue.isEmpty()); 401 } 402 403 assertTrue(batchSendQueue.isEmpty()); 405 assertTrue(drainQueueInto(batch3.addTxQueue, new LinkedList ()).isEmpty()); 406 } 407 408 private Collection drainQueueInto(LinkedQueue queue, Collection dest) throws InterruptedException { 409 while (!queue.isEmpty()) { 410 dest.add(queue.take()); 411 } 412 return dest; 413 } 414 415 private TestTransactionBatch getNextNewBatch() throws InterruptedException { 416 TestTransactionBatch rv = (TestTransactionBatch) batchFactory.newBatchQueue.take(); 417 return rv; 418 } 419 420 private synchronized void callCommitOnThread(final ClientTransaction txn, final CyclicBarrier barrier) { 421 TransactionID txnID = txn.getTransactionID(); 422 423 Thread t = new Thread ("Commit for txn #" + txnID.toLong()) { 424 public void run() { 425 try { 426 barrier.barrier(); 427 manager.commit(txn); 428 } catch (Throwable th) { 429 th.printStackTrace(); 430 error.set(th); 431 } 432 } 433 }; 434 435 threads.put(txnID, t); 436 t.start(); 437 } 438 439 private ClientTransaction makeTransaction() { 440 int num = number.increment(); 441 LockID lid = new LockID("lock" + num); 442 TransactionContext tc = new TransactionContext(lid, TxnType.NORMAL, new LockID[] { lid }); 443 ClientTransaction txn = new ClientTransactionImpl(new TransactionID(num), new NullRuntimeLogger(), null); 444 txn.setTransactionContext(tc); 445 txn.fieldChanged(new MockTCObject(new ObjectID(num), this), "class", "class.field", new ObjectID(num), -1); 446 return txn; 447 } 448 449 private final class TestTransactionBatch implements ClientTransactionBatch { 450 451 public final Set ackedTransactions = new HashSet (); 452 453 public final TxnBatchID batchID; 454 455 public final LinkedQueue addTxQueue = new LinkedQueue(); 456 private final LinkedList transactions = new LinkedList (); 457 458 public TestTransactionBatch(TxnBatchID batchID) { 459 this.batchID = batchID; 460 } 461 462 public String toString() { 463 return "TestTransactionBatch[" + batchID + "] = Txn [ " + transactions + " ]"; 464 } 465 466 public synchronized boolean isEmpty() { 467 return transactions.isEmpty(); 468 } 469 470 public int numberOfTxns() { 471 return transactions.size(); 472 } 473 474 public boolean isNull() { 475 return false; 476 } 477 478 public synchronized void addTransaction(ClientTransaction txn) { 479 try { 480 addTxQueue.put(txn); 481 transactions.add(txn); 482 } catch (InterruptedException e) { 483 throw new TCRuntimeException(e); 484 } 485 } 486 487 public void removeTransaction(TransactionID txID) { 488 return; 489 } 490 491 public Collection addTransactionIDsTo(Collection c) { 492 for (Iterator i = transactions.iterator(); i.hasNext();) { 493 ClientTransaction txn = (ClientTransaction) i.next(); 494 c.add(txn.getTransactionID()); 495 } 496 return c; 497 } 498 499 public void send() { 500 try { 501 batchSendQueue.put(this); 502 } catch (InterruptedException e) { 503 throw new TCRuntimeException(e); 504 } 505 } 506 507 public TCByteBuffer[] getData() { 508 return null; 509 } 510 511 public void addAcknowledgedTransactionIDs(Collection acknowledged) { 512 ackedTransactions.addAll(acknowledged); 513 } 514 515 public Collection getAcknowledgedTransactionIDs() { 516 throw new ImplementMe(); 517 } 518 519 public TxnBatchID getTransactionBatchID() { 520 return this.batchID; 521 } 522 523 public SequenceID getMinTransactionSequence() { 524 throw new ImplementMe(); 525 } 526 527 public void recycle() { 528 return; 529 } 530 531 public Collection addTransactionSequenceIDsTo(Collection sequenceIDs) { 532 for (Iterator i = transactions.iterator(); i.hasNext();) { 533 ClientTransaction txn = (ClientTransaction) i.next(); 534 sequenceIDs.add(txn.getSequenceID()); 535 } 536 return sequenceIDs; 537 } 538 539 public String dump() { 540 return "TestTransactionBatch"; 541 } 542 543 public int byteSize() { 544 return 64000; 545 } 546 547 } 548 549 private final class TestTransactionBatchFactory implements TransactionBatchFactory { 550 private long idSequence; 551 public final LinkedQueue newBatchQueue = new LinkedQueue(); 552 553 public synchronized ClientTransactionBatch nextBatch() { 554 ClientTransactionBatch rv = new TestTransactionBatch(new TxnBatchID(++idSequence)); 555 try { 556 newBatchQueue.put(rv); 557 } catch (InterruptedException e) { 558 throw new TCRuntimeException(e); 559 } 560 return rv; 561 } 562 } 563 564 private static class MockChannel implements DSOClientMessageChannel { 565 566 public void addClassMapping(TCMessageType messageType, Class messageClass) { 567 throw new ImplementMe(); 568 } 569 570 public void addListener(ChannelEventListener listener) { 571 throw new ImplementMe(); 572 } 573 574 public ClientMessageChannel channel() { 575 throw new ImplementMe(); 576 } 577 578 public void close() { 579 throw new ImplementMe(); 580 } 581 582 public AcknowledgeTransactionMessageFactory getAcknowledgeTransactionMessageFactory() { 583 throw new ImplementMe(); 584 } 585 586 public ChannelIDProvider getChannelIDProvider() { 587 throw new ImplementMe(); 588 } 589 590 public ClientHandshakeMessageFactory getClientHandshakeMessageFactory() { 591 throw new ImplementMe(); 592 } 593 594 public CommitTransactionMessageFactory getCommitTransactionMessageFactory() { 595 throw new ImplementMe(); 596 } 597 598 public LockRequestMessageFactory getLockRequestMessageFactory() { 599 throw new ImplementMe(); 600 } 601 602 public ObjectIDBatchRequestMessageFactory getObjectIDBatchRequestMessageFactory() { 603 throw new ImplementMe(); 604 } 605 606 public RequestManagedObjectMessageFactory getRequestManagedObjectMessageFactory() { 607 throw new ImplementMe(); 608 } 609 610 public RequestRootMessageFactory getRequestRootMessageFactory() { 611 throw new ImplementMe(); 612 } 613 614 public boolean isConnected() { 615 throw new ImplementMe(); 616 } 617 618 public void open() { 619 throw new ImplementMe(); 620 } 621 622 public void routeMessageType(TCMessageType messageType, Sink destSink, Sink hydrateSink) { 623 throw new ImplementMe(); 624 } 625 626 public JMXMessage getJMXMessage() { 627 throw new ImplementMe(); 628 } 629 } 630 631 } 632 | Popular Tags |