1 5 package com.tc.objectserver.handshakemanager; 6 7 import com.tc.exception.ImplementMe; 8 import com.tc.logging.TCLogging; 9 import com.tc.net.protocol.tcm.ChannelID; 10 import com.tc.net.protocol.tcm.MessageChannel; 11 import com.tc.net.protocol.tcm.TestMessageChannel; 12 import com.tc.net.protocol.transport.ConnectionID; 13 import com.tc.object.ObjectID; 14 import com.tc.object.lockmanager.api.LockContext; 15 import com.tc.object.lockmanager.api.LockID; 16 import com.tc.object.lockmanager.api.LockLevel; 17 import com.tc.object.lockmanager.api.ThreadID; 18 import com.tc.object.lockmanager.api.WaitContext; 19 import com.tc.object.msg.BatchTransactionAcknowledgeMessage; 20 import com.tc.object.msg.ClientHandshakeAckMessage; 21 import com.tc.object.msg.TestClientHandshakeMessage; 22 import com.tc.object.net.DSOChannelManager; 23 import com.tc.object.net.DSOChannelManagerEventListener; 24 import com.tc.object.tx.WaitInvocation; 25 import com.tc.objectserver.api.TestSink; 26 import com.tc.objectserver.impl.TestObjectManager; 27 import com.tc.objectserver.l1.api.TestClientStateManager; 28 import com.tc.objectserver.l1.api.TestClientStateManager.AddReferenceContext; 29 import com.tc.objectserver.lockmanager.api.TestLockManager; 30 import com.tc.objectserver.lockmanager.api.TestLockManager.ReestablishLockContext; 31 import com.tc.objectserver.lockmanager.api.TestLockManager.WaitCallContext; 32 import com.tc.objectserver.tx.TestServerTransactionManager; 33 import com.tc.test.TCTestCase; 34 import com.tc.util.SequenceID; 35 import com.tc.util.SequenceValidator; 36 import com.tc.util.TestTimer; 37 import com.tc.util.TestTimer.ScheduleCallContext; 38 import com.tc.util.concurrent.NoExceptionLinkedQueue; 39 import com.tc.util.sequence.ObjectIDSequenceProvider; 40 41 import java.util.ArrayList ; 42 import java.util.Collection ; 43 import java.util.HashMap ; 44 import java.util.HashSet ; 45 import java.util.Iterator ; 46 import java.util.LinkedList ; 47 import java.util.List ; 48 import java.util.Map ; 49 import java.util.Set ; 50 51 public class ServerClientHandshakeManagerTest extends TCTestCase { 52 53 private ServerClientHandshakeManager hm; 54 private TestObjectManager objectManager; 55 private TestClientStateManager clientStateManager; 56 private TestLockManager lockManager; 57 private TestSink lockResponseSink; 58 private long reconnectTimeout; 59 private Set existingUnconnectedClients; 60 private TestTimer timer; 61 private TestChannelManager channelManager; 62 private SequenceValidator sequenceValidator; 63 private long objectIDSequenceStart; 64 65 public void setUp() { 66 existingUnconnectedClients = new HashSet (); 67 objectManager = new TestObjectManager(); 68 clientStateManager = new TestClientStateManager(); 69 lockManager = new TestLockManager(); 70 lockResponseSink = new TestSink(); 71 reconnectTimeout = 10 * 1000; 72 timer = new TestTimer(); 73 channelManager = new TestChannelManager(); 74 sequenceValidator = new SequenceValidator(0); 75 objectIDSequenceStart = 1000; 76 } 77 78 private void initHandshakeManager() { 79 this.hm = new ServerClientHandshakeManager(TCLogging.getLogger(ServerClientHandshakeManager.class), channelManager, 80 objectManager, sequenceValidator, clientStateManager, lockManager, 81 new TestServerTransactionManager(), lockResponseSink, 82 new ObjectIDSequenceProvider(objectIDSequenceStart), timer, 83 reconnectTimeout, false); 84 this.hm.setStarting(convertToConnectionIds(existingUnconnectedClients)); 85 } 86 87 private Set convertToConnectionIds(Set s) { 88 HashSet ns = new HashSet (); 89 for (Iterator i = s.iterator(); i.hasNext();) { 90 ChannelID cid = (ChannelID) i.next(); 91 ns.add(new ConnectionID(cid.toLong(), "FORTESTING")); 92 } 93 return ns; 94 } 95 96 public void testNoUnconnectedClients() throws Exception { 97 initHandshakeManager(); 98 assertStarted(); 99 } 100 101 public void testTimeout() throws Exception { 102 ChannelID channelID1 = new ChannelID(100); 103 104 existingUnconnectedClients.add(channelID1); 105 existingUnconnectedClients.add(new ChannelID(101)); 106 107 initHandshakeManager(); 108 109 TestClientHandshakeMessage handshake = newClientHandshakeMessage(channelID1); 110 hm.notifyClientConnect(handshake); 111 112 assertEquals(1, timer.scheduleCalls.size()); 114 TestTimer.ScheduleCallContext scc = (ScheduleCallContext) timer.scheduleCalls.get(0); 115 116 assertTrue(timer.cancelCalls.isEmpty()); 119 scc.task.run(); 120 assertEquals(1, timer.cancelCalls.size()); 121 assertEquals(1, channelManager.closeAllChannelIDs.size()); 122 assertEquals(new ChannelID(101), channelManager.closeAllChannelIDs.get(0)); 123 124 assertStarted(); 126 } 127 128 public void testNotifyTimeout() throws Exception { 129 ChannelID channelID1 = new ChannelID(1); 130 ChannelID channelID2 = new ChannelID(2); 131 132 existingUnconnectedClients.add(channelID1); 133 existingUnconnectedClients.add(channelID2); 134 135 initHandshakeManager(); 136 137 assertFalse(hm.isStarted()); 138 139 hm.notifyTimeout(); 142 assertEquals(2, channelManager.closeAllChannelIDs.size()); 143 assertEquals(existingUnconnectedClients, new HashSet (channelManager.closeAllChannelIDs)); 144 assertStarted(); 145 } 146 147 public void testBasic() throws Exception { 148 final Set connectedClients = new HashSet (); 149 ChannelID channelID1 = new ChannelID(100); 150 ChannelID channelID2 = new ChannelID(101); 151 ChannelID channelID3 = new ChannelID(102); 152 153 157 existingUnconnectedClients.add(channelID1); 158 existingUnconnectedClients.add(channelID2); 159 160 initHandshakeManager(); 161 162 TestClientHandshakeMessage handshake = newClientHandshakeMessage(channelID1); 163 ArrayList sequenceIDs = new ArrayList (); 164 SequenceID minSequenceID = new SequenceID(10); 165 sequenceIDs.add(minSequenceID); 166 handshake.transactionSequenceIDs = sequenceIDs; 167 handshake.clientObjectIds.add(new ObjectID(200)); 168 handshake.clientObjectIds.add(new ObjectID(20002)); 169 170 List lockContexts = new LinkedList (); 171 172 lockContexts.add(new LockContext(new LockID("my lock"), channelID1, new ThreadID(10001), LockLevel.WRITE)); 173 lockContexts.add(new LockContext(new LockID("my other lock)"), channelID1, new ThreadID(10002), LockLevel.READ)); 174 handshake.lockContexts.addAll(lockContexts); 175 176 WaitContext waitContext = new WaitContext(new LockID("d;alkjd"), channelID1, new ThreadID(101), LockLevel.WRITE, 177 new WaitInvocation()); 178 handshake.waitContexts.add(waitContext); 179 handshake.isChangeListener = true; 180 181 assertFalse(sequenceValidator.isNext(handshake.getChannelID(), new SequenceID(minSequenceID.toLong()))); 182 assertEquals(2, existingUnconnectedClients.size()); 183 assertFalse(hm.isStarted()); 184 assertTrue(hm.isStarting()); 185 186 sequenceValidator.remove(handshake.getChannelID()); 188 189 channelManager.channelIDs.add(handshake.channelID); 191 hm.notifyClientConnect(handshake); 192 connectedClients.add(handshake); 193 194 assertTrue(hm.isStarting()); 196 assertFalse(hm.isStarted()); 197 198 assertEquals(1, timer.scheduleCalls.size()); 200 TestTimer.ScheduleCallContext scc = (ScheduleCallContext) timer.scheduleCalls.get(0); 201 assertEquals(new Long (reconnectTimeout), scc.delay); 202 assertTrue(scc.period == null); 203 assertTrue(scc.time == null); 204 205 assertTrue(sequenceValidator.isNext(handshake.getChannelID(), new SequenceID(minSequenceID.toLong()))); 207 208 assertTrue(handshake.clientObjectIds.size() > 0); 211 assertEquals(handshake.clientObjectIds.size(), clientStateManager.addReferenceCalls.size()); 212 for (Iterator i = clientStateManager.addReferenceCalls.iterator(); i.hasNext();) { 213 TestClientStateManager.AddReferenceContext ctxt = (AddReferenceContext) i.next(); 214 assertTrue(handshake.clientObjectIds.remove(ctxt.objectID)); 215 } 216 assertTrue(handshake.clientObjectIds.isEmpty()); 217 218 assertEquals(lockContexts.size(), handshake.lockContexts.size()); 220 assertEquals(handshake.lockContexts.size(), lockManager.reestablishLockCalls.size()); 221 for (int i = 0; i < lockContexts.size(); i++) { 222 LockContext lockContext = (LockContext) lockContexts.get(i); 223 TestLockManager.ReestablishLockContext ctxt = (ReestablishLockContext) lockManager.reestablishLockCalls.get(i); 224 assertEquals(lockContext.getLockID(), ctxt.lockContext.getLockID()); 225 assertEquals(lockContext.getChannelID(), ctxt.lockContext.getChannelID()); 226 assertEquals(lockContext.getThreadID(), ctxt.lockContext.getThreadID()); 227 assertEquals(lockContext.getLockLevel(), ctxt.lockContext.getLockLevel()); 228 } 229 230 assertEquals(1, handshake.waitContexts.size()); 232 assertEquals(handshake.waitContexts.size(), lockManager.reestablishWaitCalls.size()); 233 TestLockManager.WaitCallContext ctxt = (WaitCallContext) lockManager.reestablishWaitCalls.get(0); 234 assertEquals(waitContext.getLockID(), ctxt.lockID); 235 assertEquals(waitContext.getChannelID(), ctxt.channelID); 236 assertEquals(waitContext.getThreadID(), ctxt.threadID); 237 assertEquals(waitContext.getWaitInvocation(), ctxt.waitInvocation); 238 assertSame(lockResponseSink, ctxt.lockResponseSink); 239 240 assertEquals(0, timer.cancelCalls.size()); 241 242 assertEquals(0, channelManager.handshakeMessages.size()); 244 245 handshake = newClientHandshakeMessage(channelID2); 247 channelManager.channelIDs.add(handshake.channelID); 248 hm.notifyClientConnect(handshake); 249 connectedClients.add(handshake); 250 251 assertStarted(); 252 253 assertEquals(1, timer.cancelCalls.size()); 255 256 handshake = newClientHandshakeMessage(channelID3); 258 channelManager.channelIDs.add(handshake.channelID); 259 hm.notifyClientConnect(handshake); 260 connectedClients.add(handshake); 261 262 for (Iterator i = connectedClients.iterator(); i.hasNext();) { 264 handshake = (TestClientHandshakeMessage) i.next(); 265 Collection acks = channelManager.getMessages(handshake.channelID); 266 assertEquals("Wrong number of acks for channel: " + handshake.channelID, 1, acks.size()); 267 TestClientHandshakeAckMessage ack = (TestClientHandshakeAckMessage) new ArrayList (acks).get(0); 268 assertNotNull(ack.sendQueue.poll(1)); 269 } 270 } 271 272 public void testObjectIDsInHandshake() throws Exception { 273 final Set connectedClients = new HashSet (); 274 ChannelID channelID1 = new ChannelID(100); 275 ChannelID channelID2 = new ChannelID(101); 276 ChannelID channelID3 = new ChannelID(102); 277 278 existingUnconnectedClients.add(channelID1); 279 existingUnconnectedClients.add(channelID2); 280 281 initHandshakeManager(); 282 283 TestClientHandshakeMessage handshake = newClientHandshakeMessage(channelID1); 284 handshake.setIsObjectIDsRequested(true); 285 286 hm.notifyClientConnect(handshake); 287 channelManager.channelIDs.add(handshake.channelID); 288 connectedClients.add(handshake); 289 290 assertEquals(0, channelManager.handshakeMessages.size()); 292 293 handshake = newClientHandshakeMessage(channelID2); 295 handshake.setIsObjectIDsRequested(false); 296 channelManager.channelIDs.add(handshake.channelID); 297 hm.notifyClientConnect(handshake); 298 connectedClients.add(handshake); 299 300 assertStarted(); 301 302 handshake = newClientHandshakeMessage(channelID3); 304 handshake.setIsObjectIDsRequested(true); 305 channelManager.channelIDs.add(handshake.channelID); 306 hm.notifyClientConnect(handshake); 307 connectedClients.add(handshake); 308 309 for (Iterator i = connectedClients.iterator(); i.hasNext();) { 311 handshake = (TestClientHandshakeMessage) i.next(); 312 Collection acks = channelManager.getMessages(handshake.channelID); 313 assertEquals("Wrong number of acks for channel: " + handshake.channelID, 1, acks.size()); 314 TestClientHandshakeAckMessage ack = (TestClientHandshakeAckMessage) new ArrayList (acks).get(0); 315 assertNotNull(ack.sendQueue.poll(1)); 316 317 if (ack.channelID.equals(channelID2)) { 318 assertTrue(ack.getObjectIDSequenceStart() == 0); 319 assertTrue(ack.getObjectIDSequenceEnd() == 0); 320 } else { 321 assertFalse(ack.getObjectIDSequenceStart() == 0); 322 assertFalse(ack.getObjectIDSequenceEnd() == 0); 323 assertTrue(ack.getObjectIDSequenceStart() < ack.getObjectIDSequenceEnd()); 324 } 325 } 326 } 327 328 private void assertStarted() { 329 assertEquals(1, lockManager.startCalls.size()); 331 332 assertNotNull(objectManager.startCalls.poll(1)); 334 335 assertTrue(hm.isStarted()); 337 } 338 339 private TestClientHandshakeMessage newClientHandshakeMessage(ChannelID channelID1) { 340 TestClientHandshakeMessage handshake = new TestClientHandshakeMessage(); 341 handshake.channelID = channelID1; 342 ArrayList sequenceIDs = new ArrayList (); 343 sequenceIDs.add(new SequenceID(1)); 344 handshake.setTransactionSequenceIDs(sequenceIDs); 345 return handshake; 346 } 347 348 private static final class TestChannelManager implements DSOChannelManager { 349 350 public final List closeAllChannelIDs = new ArrayList (); 351 public final Map handshakeMessages = new HashMap (); 352 public final Collection channelIDs = new HashSet (); 353 354 public void closeAll(Collection theChannelIDs) { 355 closeAllChannelIDs.addAll(theChannelIDs); 356 } 357 358 public MessageChannel getActiveChannel(ChannelID id) { 359 return null; 360 } 361 362 public MessageChannel[] getActiveChannels() { 363 return null; 364 } 365 366 public Collection getAllActiveChannelIDs() { 367 return this.channelIDs; 368 } 369 370 public boolean isValidID(ChannelID channelID) { 371 return false; 372 } 373 374 public String getChannelAddress(ChannelID channelID) { 375 return null; 376 } 377 378 public Collection getMessages(ChannelID channelID) { 379 Collection msgs = (Collection ) this.handshakeMessages.get(channelID); 380 if (msgs == null) { 381 msgs = new ArrayList (); 382 this.handshakeMessages.put(channelID, msgs); 383 } 384 return msgs; 385 } 386 387 private ClientHandshakeAckMessage newClientHandshakeAckMessage(ChannelID channelID) { 388 ClientHandshakeAckMessage msg = new TestClientHandshakeAckMessage(channelID); 389 getMessages(channelID).add(msg); 390 return msg; 391 } 392 393 public BatchTransactionAcknowledgeMessage newBatchTransactionAcknowledgeMessage(ChannelID channelID) { 394 throw new ImplementMe(); 395 } 396 397 public void addEventListener(DSOChannelManagerEventListener listener) { 398 throw new ImplementMe(); 399 } 400 401 public Collection getRawChannelIDs() { 402 return getAllActiveChannelIDs(); 403 } 404 405 public boolean isActiveID(ChannelID channelID) { 406 throw new ImplementMe(); 407 } 408 409 public void makeChannelActive(ChannelID channelID, long startIDs, long endIDs, boolean persistent) { 410 ClientHandshakeAckMessage ackMsg = newClientHandshakeAckMessage(channelID); 411 ackMsg.initialize(startIDs, endIDs, persistent, getActiveChannels()); 412 ackMsg.send(); 413 } 414 415 public void makeChannelActiveNoAck(MessageChannel channel) { 416 } 418 419 } 420 421 private static class TestClientHandshakeAckMessage implements ClientHandshakeAckMessage { 422 public final NoExceptionLinkedQueue sendQueue = new NoExceptionLinkedQueue(); 423 public final ChannelID channelID; 424 public long start; 425 public long end; 426 private boolean persistent; 427 private final TestMessageChannel channel; 428 429 private TestClientHandshakeAckMessage(ChannelID channelID) { 430 this.channelID = channelID; 431 this.channel = new TestMessageChannel(); 432 this.channel.channelID = channelID; 433 } 434 435 public void send() { 436 sendQueue.put(new Object ()); 437 } 438 439 public long getObjectIDSequenceStart() { 440 return start; 441 } 442 443 public long getObjectIDSequenceEnd() { 444 return end; 445 } 446 447 public boolean getPersistentServer() { 448 return persistent; 449 } 450 451 public void initialize(long startOid, long endOid, boolean isPersistent, MessageChannel[] channels) { 452 this.start = startOid; 453 this.end = endOid; 454 this.persistent = isPersistent; 455 } 456 457 public MessageChannel getChannel() { 458 return channel; 459 } 460 461 public String [] getAllNodes() { 462 throw new ImplementMe(); 463 } 464 465 public String getThisNodeId() { 466 throw new ImplementMe(); 467 } 468 469 } 470 471 } 472 | Popular Tags |