1 5 package com.tc.objectserver.handler; 6 7 import com.tc.async.api.EventContext; 8 import com.tc.async.impl.MockStage; 9 import com.tc.exception.ImplementMe; 10 import com.tc.net.TCSocketAddress; 11 import com.tc.net.protocol.NetworkStackID; 12 import com.tc.net.protocol.TCNetworkMessage; 13 import com.tc.net.protocol.tcm.ChannelEventListener; 14 import com.tc.net.protocol.tcm.ChannelID; 15 import com.tc.net.protocol.tcm.MessageChannel; 16 import com.tc.net.protocol.tcm.TCMessage; 17 import com.tc.net.protocol.tcm.TCMessageType; 18 import com.tc.object.dmi.DmiDescriptor; 19 import com.tc.object.dna.impl.ObjectStringSerializer; 20 import com.tc.object.gtx.GlobalTransactionID; 21 import com.tc.object.lockmanager.api.LockID; 22 import com.tc.object.msg.BatchTransactionAcknowledgeMessage; 23 import com.tc.object.net.DSOChannelManager; 24 import com.tc.object.net.DSOChannelManagerEventListener; 25 import com.tc.object.net.NoSuchChannelException; 26 import com.tc.object.tx.ServerTransactionID; 27 import com.tc.object.tx.TransactionID; 28 import com.tc.object.tx.TxnBatchID; 29 import com.tc.object.tx.TxnType; 30 import com.tc.objectserver.context.BroadcastChangeContext; 31 import com.tc.objectserver.core.api.ServerConfigurationContext; 32 import com.tc.objectserver.core.impl.TestServerConfigurationContext; 33 import com.tc.objectserver.gtx.TestGlobalTransactionManager; 34 import com.tc.objectserver.l1.api.TestClientStateManager; 35 import com.tc.objectserver.lockmanager.api.NotifiedWaiters; 36 import com.tc.objectserver.lockmanager.api.TestLockManager; 37 import com.tc.objectserver.managedobject.BackReferences; 38 import com.tc.objectserver.tx.NullTransactionalObjectManager; 39 import com.tc.objectserver.tx.ServerTransaction; 40 import com.tc.objectserver.tx.ServerTransactionImpl; 41 import com.tc.objectserver.tx.TestServerTransactionManager; 42 import com.tc.objectserver.tx.TestTransactionBatchManager; 43 import com.tc.test.TCTestCase; 44 import com.tc.util.SequenceID; 45 import com.tc.util.concurrent.NoExceptionLinkedQueue; 46 47 import java.util.Collection ; 48 import java.util.Collections ; 49 import java.util.HashMap ; 50 import java.util.LinkedList ; 51 import java.util.List ; 52 import java.util.Map ; 53 54 public class BroadcastChangeHandlerTest extends TCTestCase { 55 56 private TestTransactionBatchManager transactionBatchManager; 57 private BroadcastChangeHandler handler; 58 private TestServerTransactionManager transactionManager; 59 private TestChannelManager channelManager; 60 private TestGlobalTransactionManager gtxm; 61 private TestLockManager lockManager; 62 63 public void setUp() throws Exception { 64 transactionBatchManager = new TestTransactionBatchManager(); 65 transactionManager = new TestServerTransactionManager(); 66 channelManager = new TestChannelManager(2); 67 lockManager = new TestLockManager(); 68 gtxm = new TestGlobalTransactionManager(); 69 handler = new BroadcastChangeHandler(transactionBatchManager); 70 71 MockStage stageMOR = new MockStage("MOR"); 72 MockStage stageRTO = new MockStage("RTO"); 73 TestServerConfigurationContext context = new TestServerConfigurationContext(); 74 context.transactionManager = transactionManager; 75 context.channelManager = channelManager; 76 context.txnObjectManager = new NullTransactionalObjectManager(); 77 context.clientStateManager = new TestClientStateManager(); 78 context.addStage(ServerConfigurationContext.MANAGED_OBJECT_REQUEST_STAGE, stageMOR); 79 context.addStage(ServerConfigurationContext.RESPOND_TO_OBJECT_REQUEST_STAGE, stageRTO); 80 context.lockManager = lockManager; 81 82 handler.initialize(context); 83 } 84 85 public void testBatchAccounting() throws Exception { 86 87 TxnBatchID batchID = new TxnBatchID(1); 88 TransactionID txID = new TransactionID(1); 89 LockID[] lockIDs = new LockID[0]; 90 ChannelID channelID = new ChannelID(1); 91 List dnas = Collections.unmodifiableList(new LinkedList ()); 92 ObjectStringSerializer serializer = null; 93 Map newRoots = Collections.unmodifiableMap(new HashMap ()); 94 TxnType txnType = TxnType.NORMAL; 95 SequenceID sequenceID = new SequenceID(1); 96 ServerTransaction tx = new ServerTransactionImpl(batchID, txID, sequenceID, lockIDs, channelID, dnas, serializer, 97 newRoots, txnType, new LinkedList (), DmiDescriptor.EMPTY_ARRAY); 98 99 assertTrue(transactionBatchManager.batchComponentCompleteCalls.isEmpty()); 100 assertFalse(transactionBatchManager.isBatchComponentComplete.get()); 101 assertTrue(channelManager.newBatchTransactionAcknowledgeMessageCalls.isEmpty()); 102 assertFalse(channelManager.shouldThrowNoSuchChannelException); 103 handler.handleEvent(getBroadcastTxnContext(tx)); 104 105 assertBatchComponentCompleteCalled(batchID, txID, channelID); 107 108 assertTrue(channelManager.newBatchTransactionAcknowledgeMessageCalls.isEmpty()); 110 111 transactionBatchManager.isBatchComponentComplete.set(true); 113 channelManager.btamsg = new TestBatchTransactionAcknowledgeMessage(); 114 assertNotNull(channelManager.btamsg); 115 handler.handleEvent(getBroadcastTxnContext(tx)); 116 assertBatchComponentCompleteCalled(batchID, txID, channelID); 117 118 assertEquals(channelID, channelManager.newBatchTransactionAcknowledgeMessageCalls.poll(1)); 120 assertEquals(batchID, channelManager.btamsg.initializeCalls.poll(1)); 121 assertNotNull(channelManager.btamsg.sendCalls.poll(1)); 122 123 channelManager.shouldThrowNoSuchChannelException = true; 125 channelManager.btamsg = null; 126 assertTrue(transactionBatchManager.batchComponentCompleteCalls.isEmpty()); 127 assertTrue(transactionBatchManager.isBatchComponentComplete.get()); 128 assertTrue(channelManager.newBatchTransactionAcknowledgeMessageCalls.isEmpty()); 129 assertTrue(channelManager.shouldThrowNoSuchChannelException); 130 131 handler.handleEvent(getBroadcastTxnContext(tx)); 132 assertNotNull(channelManager.newBatchTransactionAcknowledgeMessageCalls.poll(1)); 133 } 134 135 private EventContext getBroadcastTxnContext(ServerTransaction tx) { 136 ServerTransactionID stxID = tx.getServerTransactionID(); 137 GlobalTransactionID gid = gtxm.createGlobalTransactionID(stxID); 138 return new BroadcastChangeContext(gid, tx, gtxm.getLowGlobalTransactionIDWatermark(), new NotifiedWaiters(), 139 new BackReferences()); 140 } 141 142 private void assertBatchComponentCompleteCalled(TxnBatchID batchID, TransactionID txID, ChannelID channelID) { 143 Object [] args = (Object []) transactionBatchManager.batchComponentCompleteCalls.poll(1); 144 assertTrue(transactionBatchManager.batchComponentCompleteCalls.isEmpty()); 145 assertNotNull(args); 146 assertEquals(channelID, args[0]); 147 assertEquals(batchID, args[1]); 148 assertEquals(txID, args[2]); 149 } 150 151 private static class TestBatchTransactionAcknowledgeMessage implements BatchTransactionAcknowledgeMessage { 152 public final NoExceptionLinkedQueue initializeCalls = new NoExceptionLinkedQueue(); 153 public final NoExceptionLinkedQueue sendCalls = new NoExceptionLinkedQueue(); 154 155 public void initialize(TxnBatchID id) { 156 initializeCalls.put(id); 157 } 158 159 public TxnBatchID getBatchID() { 160 throw new ImplementMe(); 161 } 162 163 public void send() { 164 sendCalls.put(new Object ()); 165 } 166 167 } 168 169 private static class TestChannelManager implements DSOChannelManager { 170 171 Map allChannels = new HashMap (); 172 173 public TestChannelManager(int noOfChannels) { 174 while (noOfChannels-- > 0) { 175 ChannelID cid = new ChannelID(noOfChannels); 176 allChannels.put(cid, new TestMessageChannel(cid)); 177 } 178 } 179 180 public void closeAll(Collection channelIDs) { 181 throw new ImplementMe(); 182 } 183 184 public MessageChannel getActiveChannel(ChannelID id) { 185 return (MessageChannel) allChannels.get(id); 186 } 187 188 public MessageChannel[] getActiveChannels() { 189 return (MessageChannel[]) allChannels.values().toArray(new MessageChannel[0]); 190 } 191 192 public boolean isActiveID(ChannelID channelID) { 193 throw new ImplementMe(); 194 } 195 196 public String getChannelAddress(ChannelID channelID) { 197 throw new ImplementMe(); 198 } 199 200 public NoExceptionLinkedQueue newBatchTransactionAcknowledgeMessageCalls = new NoExceptionLinkedQueue(); 201 public TestBatchTransactionAcknowledgeMessage btamsg; 202 public boolean shouldThrowNoSuchChannelException = false; 203 204 public BatchTransactionAcknowledgeMessage newBatchTransactionAcknowledgeMessage(ChannelID channelID) 205 throws NoSuchChannelException { 206 this.newBatchTransactionAcknowledgeMessageCalls.put(channelID); 207 if (shouldThrowNoSuchChannelException) { throw new NoSuchChannelException(); } 208 return btamsg; 209 } 210 211 public Collection getAllActiveChannelIDs() { 212 throw new ImplementMe(); 213 } 214 215 public void addEventListener(DSOChannelManagerEventListener listener) { 216 throw new ImplementMe(); 217 } 218 219 public void makeChannelActive(ChannelID channelID, long startIDs, long endIDs, boolean persistent) { 220 throw new ImplementMe(); 221 } 222 223 public Collection getRawChannelIDs() { 224 throw new ImplementMe(); 225 } 226 227 public void makeChannelActiveNoAck(MessageChannel channel) { 228 throw new ImplementMe(); 229 } 230 } 231 232 private static final class TestMessageChannel implements MessageChannel { 233 234 private final ChannelID cid; 235 236 public TestMessageChannel(ChannelID cid) { 237 this.cid = cid; 238 } 239 240 public TCSocketAddress getLocalAddress() { 241 throw new ImplementMe(); 242 } 243 244 public TCSocketAddress getRemoteAddress() { 245 throw new ImplementMe(); 246 } 247 248 public void addListener(ChannelEventListener listener) { 249 throw new ImplementMe(); 250 } 251 252 public ChannelID getChannelID() { 253 return cid; 254 } 255 256 public boolean isOpen() { 257 throw new ImplementMe(); 258 } 259 260 public boolean isClosed() { 261 throw new ImplementMe(); 262 } 263 264 public TCMessage createMessage(TCMessageType type) { 265 throw new ImplementMe(); 266 } 267 268 public Object getAttachment(String key) { 269 throw new ImplementMe(); 270 } 271 272 public void addAttachment(String key, Object value, boolean replace) { 273 throw new ImplementMe(); 274 } 275 276 public Object removeAttachment(String key) { 277 throw new ImplementMe(); 278 } 279 280 public boolean isConnected() { 281 throw new ImplementMe(); 282 } 283 284 public void send(TCNetworkMessage message) { 285 throw new ImplementMe(); 286 } 287 288 public NetworkStackID open() { 289 throw new ImplementMe(); 290 } 291 292 public void close() { 293 throw new ImplementMe(); 294 } 295 296 } 297 } 298 | Popular Tags |