1 5 package com.tc.objectserver.handler; 6 7 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 8 9 import com.tc.async.api.EventContext; 10 import com.tc.async.api.Stage; 11 import com.tc.async.impl.MockSink; 12 import com.tc.async.impl.MockStage; 13 import com.tc.exception.ImplementMe; 14 import com.tc.l2.api.L2Coordinator; 15 import com.tc.l2.ha.L2HADisabledCooridinator; 16 import com.tc.l2.msg.RelayedCommitTransactionMessage; 17 import com.tc.logging.TCLogger; 18 import com.tc.net.protocol.tcm.ChannelID; 19 import com.tc.object.dmi.DmiDescriptor; 20 import com.tc.object.dna.impl.ObjectStringSerializer; 21 import com.tc.object.gtx.GlobalTransactionID; 22 import com.tc.object.lockmanager.api.LockID; 23 import com.tc.object.msg.CommitTransactionMessage; 24 import com.tc.object.msg.NullMessageRecycler; 25 import com.tc.object.net.ChannelStats; 26 import com.tc.object.net.DSOChannelManager; 27 import com.tc.object.tx.TransactionID; 28 import com.tc.object.tx.TxnBatchID; 29 import com.tc.object.tx.TxnType; 30 import com.tc.objectserver.api.ObjectManager; 31 import com.tc.objectserver.api.ObjectRequestManager; 32 import com.tc.objectserver.core.api.ServerConfigurationContext; 33 import com.tc.objectserver.gtx.TestGlobalTransactionManager; 34 import com.tc.objectserver.handshakemanager.ServerClientHandshakeManager; 35 import com.tc.objectserver.impl.TestObjectManager; 36 import com.tc.objectserver.l1.api.ClientStateManager; 37 import com.tc.objectserver.lockmanager.api.LockManager; 38 import com.tc.objectserver.persistence.api.ManagedObjectStore; 39 import com.tc.objectserver.tx.ServerTransaction; 40 import com.tc.objectserver.tx.ServerTransactionImpl; 41 import com.tc.objectserver.tx.ServerTransactionManager; 42 import com.tc.objectserver.tx.TestServerTransactionManager; 43 import com.tc.objectserver.tx.TestTransactionBatchManager; 44 import com.tc.objectserver.tx.TestTransactionalStageCoordinator; 45 import com.tc.objectserver.tx.TransactionBatchReader; 46 import com.tc.objectserver.tx.TransactionBatchReaderFactory; 47 import com.tc.objectserver.tx.TransactionSequencer; 48 import com.tc.objectserver.tx.TransactionalObjectManager; 49 import com.tc.objectserver.tx.TransactionalObjectManagerImpl; 50 import com.tc.test.TCTestCase; 51 import com.tc.util.SequenceID; 52 import com.tc.util.SequenceValidator; 53 54 import java.util.Collection ; 55 import java.util.Collections ; 56 import java.util.HashMap ; 57 import java.util.HashSet ; 58 import java.util.LinkedList ; 59 import java.util.List ; 60 import java.util.Map ; 61 62 public class ProcessTransactionHandlerTest extends TCTestCase { 63 64 private TestServerConfigurationContext cctxt; 65 private TestObjectManager objectManager; 66 private ProcessTransactionHandler handler; 67 private TestTransactionBatchReaderFactory transactionBatchReaderFactory; 68 private SynchronizedRef batchReader; 69 private TestTransactionBatchManager transactionBatchManager; 70 private TestGlobalTransactionManager gtxm; 71 private SequenceValidator sequenceValidator; 72 private TransactionalObjectManager txnObjectManager; 73 private TestTransactionalStageCoordinator txnStageCoordinator; 74 public L2Coordinator l2Coordinator; 75 public TestServerTransactionManager transactionMgr; 76 77 public void setUp() throws Exception { 78 objectManager = new TestObjectManager(); 79 transactionBatchManager = new TestTransactionBatchManager(); 80 gtxm = new TestGlobalTransactionManager(); 81 sequenceValidator = new SequenceValidator(0); 82 txnStageCoordinator = new TestTransactionalStageCoordinator(); 83 txnObjectManager = new TransactionalObjectManagerImpl(objectManager, new TransactionSequencer(), gtxm, 84 txnStageCoordinator); 85 handler = new ProcessTransactionHandler(transactionBatchManager, txnObjectManager, sequenceValidator, 86 new NullMessageRecycler()); 87 88 transactionBatchReaderFactory = new TestTransactionBatchReaderFactory(); 89 transactionMgr = new TestServerTransactionManager(); 90 l2Coordinator = new L2HADisabledCooridinator(); 91 cctxt = new TestServerConfigurationContext(); 92 batchReader = new SynchronizedRef(null); 93 handler.initialize(cctxt); 94 } 95 96 public void tests() throws Exception { 97 98 TestTransactionBatchReader batch = new TestTransactionBatchReader(); 99 batch.channelID = new ChannelID(1); 100 batch.batchID = new TxnBatchID(1); 101 102 final List dnaList = Collections.EMPTY_LIST; 103 final Map newRootsMap = Collections.EMPTY_MAP; 104 ServerTransaction serverTransaction = new ServerTransactionImpl(batch.batchID, new TransactionID(1), 105 new SequenceID(1), new LockID[0], batch.channelID, 106 dnaList, new ObjectStringSerializer(), newRootsMap, 107 TxnType.NORMAL, new LinkedList (), 108 DmiDescriptor.EMPTY_ARRAY); 109 Collection completedTransactionIDs = new HashSet (); 110 for (int i = 0; i < 10; i++) { 111 completedTransactionIDs.add(new GlobalTransactionID(i)); 112 } 113 batch.acknowledged.addAll(completedTransactionIDs); 114 batch.transactions.add(serverTransaction); 115 116 batchReader.set(batch); 117 118 assertTrue(transactionBatchManager.defineBatchContexts.isEmpty()); 120 assertTrue(objectManager.lookupObjectForCreateIfNecessaryContexts.isEmpty()); 121 objectManager.makePending = true; 123 handler.handleEvent(null); 124 Object [] args = (Object []) transactionBatchManager.defineBatchContexts.take(); 126 assertEquals(batch.channelID, args[0]); 127 assertEquals(batch.batchID, args[1]); 128 assertEquals(new Integer (1), args[2]); 129 assertTrue(transactionBatchManager.defineBatchContexts.isEmpty()); 131 132 Object [] incomingCallContext = (Object []) transactionMgr.incomingTxnContexts.remove(0); 134 assertNotNull(incomingCallContext); 135 assertTrue(transactionMgr.incomingTxnContexts.isEmpty()); 136 137 MockSink lookupSink = txnStageCoordinator.lookupSink; 139 assertFalse(lookupSink.queue.isEmpty()); 140 141 EventContext context = (EventContext) lookupSink.queue.remove(0); 142 assertNotNull(context); 143 144 args = (Object []) objectManager.lookupObjectForCreateIfNecessaryContexts.poll(100); 146 assertNull(args); 147 148 batch = new TestTransactionBatchReader(); 150 batch.channelID = new ChannelID(1); 151 batch.batchID = new TxnBatchID(2); 152 153 serverTransaction = new ServerTransactionImpl(batch.batchID, new TransactionID(2), new SequenceID(2), 154 new LockID[0], batch.channelID, dnaList, 155 new ObjectStringSerializer(), newRootsMap, TxnType.NORMAL, 156 new LinkedList (), DmiDescriptor.EMPTY_ARRAY); 157 completedTransactionIDs = new HashSet (); 158 for (int i = 11; i < 20; i++) { 159 completedTransactionIDs.add(new GlobalTransactionID(i)); 160 } 161 batch.acknowledged.addAll(completedTransactionIDs); 162 batch.transactions.add(serverTransaction); 163 164 batchReader.set(batch); 165 166 assertTrue(transactionBatchManager.defineBatchContexts.isEmpty()); 168 assertTrue(objectManager.lookupObjectForCreateIfNecessaryContexts.isEmpty()); 169 objectManager.makePending = true; 171 handler.handleEvent(null); 172 assertFalse(lookupSink.queue.isEmpty()); 173 174 incomingCallContext = (Object []) transactionMgr.incomingTxnContexts.remove(0); 176 assertNotNull(incomingCallContext); 177 assertTrue(transactionMgr.incomingTxnContexts.isEmpty()); 178 179 context = (EventContext) lookupSink.queue.remove(0); 180 assertNotNull(context); 181 182 } 183 184 private final class TestTransactionBatchReader implements TransactionBatchReader { 185 186 public final Collection acknowledged = new HashSet (); 187 public TxnBatchID batchID; 188 public ChannelID channelID; 189 public final List transactions = new LinkedList (); 190 int current = 0; 191 192 public ServerTransaction getNextTransaction() { 193 return transactions.size() > current ? (ServerTransaction) transactions.get(current++) : null; 194 } 195 196 public void reset() { 197 current = 0; 198 } 199 200 public TxnBatchID getBatchID() { 201 return batchID; 202 } 203 204 public int getNumTxns() { 205 return transactions.size(); 206 } 207 208 public ChannelID getChannelID() { 209 return channelID; 210 } 211 212 public Collection addAcknowledgedTransactionIDsTo(Collection c) { 213 c.addAll(acknowledged); 214 return c; 215 } 216 217 } 218 219 private final class TestTransactionBatchReaderFactory implements TransactionBatchReaderFactory { 220 221 public TransactionBatchReader newTransactionBatchReader(CommitTransactionMessage ctxt) { 222 return (TransactionBatchReader) batchReader.get(); 223 } 224 225 public TransactionBatchReader newTransactionBatchReader(RelayedCommitTransactionMessage commitMessage) { 226 throw new ImplementMe(); 227 } 228 229 } 230 231 private final class TestServerConfigurationContext implements ServerConfigurationContext { 232 233 public Map sinks = new HashMap (); 234 235 public ObjectManager getObjectManager() { 236 return objectManager; 237 } 238 239 public LockManager getLockManager() { 240 return null; 241 } 242 243 public DSOChannelManager getChannelManager() { 244 return null; 245 } 246 247 public ClientStateManager getClientStateManager() { 248 return null; 249 } 250 251 public ServerTransactionManager getTransactionManager() { 252 return transactionMgr; 253 } 254 255 public ManagedObjectStore getObjectStore() { 256 return null; 257 } 258 259 public ServerClientHandshakeManager getClientHandshakeManager() { 260 return null; 261 } 262 263 public ChannelStats getChannelStats() { 264 return null; 265 } 266 267 public Stage getStage(String name) { 268 if (!sinks.containsKey(name)) { 269 sinks.put(name, new MockStage(name)); 270 } 271 return (Stage) sinks.get(name); 272 } 273 274 public TCLogger getLogger(Class clazz) { 275 return null; 276 } 277 278 public TransactionBatchReaderFactory getTransactionBatchReaderFactory() { 279 return transactionBatchReaderFactory; 280 } 281 282 public ObjectRequestManager getObjectRequestManager() { 283 throw new ImplementMe(); 284 } 285 286 public TransactionalObjectManager getTransactionalObjectManager() { 287 return txnObjectManager; 288 } 289 290 public L2Coordinator getL2Coordinator() { 291 return l2Coordinator; 292 } 293 } 294 295 } 296 | Popular Tags |