1 5 package com.tc.net.protocol.tcm; 6 7 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 8 9 import com.tc.config.schema.dynamic.FixedValueConfigItem; 10 import com.tc.logging.TCLogger; 11 import com.tc.logging.TCLogging; 12 import com.tc.net.TCSocketAddress; 13 import com.tc.net.core.ConnectionInfo; 14 import com.tc.net.protocol.NetworkStackHarnessFactory; 15 import com.tc.net.protocol.PlainNetworkStackHarnessFactory; 16 import com.tc.net.protocol.tcm.msgs.PingMessage; 17 import com.tc.net.protocol.transport.DefaultConnectionIdFactory; 18 import com.tc.net.protocol.transport.NullConnectionPolicy; 19 import com.tc.object.session.NullSessionManager; 20 import com.tc.test.TCTestCase; 21 import com.tc.util.SequenceGenerator; 22 import com.tc.util.TCTimeoutException; 23 import com.tc.util.concurrent.ThreadUtil; 24 25 import gnu.trove.TLongHashSet; 26 27 import java.io.IOException ; 28 import java.net.UnknownHostException ; 29 import java.text.SimpleDateFormat ; 30 import java.util.Date ; 31 import java.util.HashSet ; 32 import java.util.LinkedList ; 33 import java.util.List ; 34 import java.util.Random ; 35 36 40 public class MessageChannelTest extends TCTestCase { 41 static final int ITERATIONS = 100; 42 static final int WAIT_PERIOD = 100; 43 static final int WAIT = ITERATIONS * WAIT_PERIOD; 44 static final int MESSAGE_COUNT = 250; 45 46 TCLogger logger = TCLogging.getLogger(getClass()); 47 NetworkListener lsnr; 48 CommunicationsManager clientComms; 49 CommunicationsManager serverComms; 50 ClientMessageChannel clientChannel; 51 SequenceGenerator sequence; 52 MessageSendAndReceiveWatcher clientWatcher; 53 MessageSendAndReceiveWatcher serverWatcher; 54 SynchronizedRef error = new SynchronizedRef(null); 55 SequenceGenerator sq = new SequenceGenerator(); 56 57 private int port = 0; 58 59 63 protected void setUp(int maxReconnectTries) throws Exception { 64 setUp(maxReconnectTries, new PlainNetworkStackHarnessFactory(), new PlainNetworkStackHarnessFactory()); 65 } 66 67 protected void setUp(int maxReconnectTries, NetworkStackHarnessFactory clientStackHarnessFactory, 68 NetworkStackHarnessFactory serverStackHarnessFactory) throws Exception { 69 super.setUp(); 70 71 clientWatcher = new MessageSendAndReceiveWatcher(); 72 serverWatcher = new MessageSendAndReceiveWatcher(); 73 74 this.sequence = new SequenceGenerator(); 75 76 MessageMonitor mm = new NullMessageMonitor(); 77 clientComms = new CommunicationsManagerImpl(mm, clientStackHarnessFactory, new NullConnectionPolicy()); 78 serverComms = new CommunicationsManagerImpl(mm, serverStackHarnessFactory, new NullConnectionPolicy()); 79 80 initListener(clientWatcher, serverWatcher); 81 this.clientChannel = createClientMessageChannel(maxReconnectTries); 82 this.setUpClientReceiveSink(); 83 } 84 85 private void initListener(final MessageSendAndReceiveWatcher myClientSenderWatcher, 86 final MessageSendAndReceiveWatcher myServerSenderWatcher) throws IOException , 87 TCTimeoutException { 88 if (lsnr != null) { 89 lsnr.stop(WAIT); 90 } 91 92 lsnr = serverComms.createListener(new NullSessionManager(), new TCSocketAddress(port), false, 93 new DefaultConnectionIdFactory()); 94 lsnr.addClassMapping(TCMessageType.PING_MESSAGE, PingMessage.class); 95 lsnr.routeMessageType(TCMessageType.PING_MESSAGE, new TCMessageSink() { 96 public void putMessage(TCMessage message) throws UnsupportedMessageTypeException { 97 99 PingMessage ping = (PingMessage) message; 100 try { 101 message.hydrate(); 102 } catch (Exception e) { 103 setError(e); 104 } 105 myClientSenderWatcher.addMessageReceived(ping); 106 107 PingMessage pong = ping.createResponse(); 108 pong.send(); 109 myServerSenderWatcher.addMessageSent(pong); 110 } 111 }); 112 lsnr.start(new HashSet()); 113 this.port = lsnr.getBindPort(); 114 } 115 116 protected void tearDown() throws Exception { 117 super.tearDown(); 118 119 final Throwable lastError = (Throwable ) error.get(); 120 if (lastError != null) { throw new Exception (lastError); } 121 122 if (lsnr != null) lsnr.stop(WAIT); 123 if (this.clientChannel != null) this.clientChannel.close(); 124 if (clientComms != null) clientComms.shutdown(); 125 if (serverComms != null) serverComms.shutdown(); 126 } 127 128 public void testAttachments() throws Exception { 129 setUp(10); 130 String key = "key"; 131 MessageChannel channel = createClientMessageChannel(10); 132 assertNull(channel.getAttachment(key)); 133 assertNull(channel.removeAttachment(key)); 134 135 Object attachment = new Object (); 136 Object attachment2 = new Object (); 137 channel.addAttachment(key, attachment, false); 138 assertSame(attachment, channel.getAttachment(key)); 139 channel.addAttachment(key, attachment, false); 140 assertSame(attachment, channel.getAttachment(key)); 141 142 channel.addAttachment(key, attachment2, true); 143 assertSame(attachment2, channel.getAttachment(key)); 144 145 Object removed = channel.removeAttachment(key); 146 assertSame(attachment2, removed); 147 148 removed = channel.removeAttachment(key); 149 assertNull(removed); 150 assertNull(channel.getAttachment(key)); 151 } 152 153 public void testAutomaticReconnect() throws Exception { 154 setUp(10); 155 assertEquals(0, clientChannel.getConnectCount()); 156 assertEquals(0, clientChannel.getConnectAttemptCount()); 157 clientChannel.open(); 158 assertEquals(1, clientChannel.getConnectCount()); 159 assertEquals(1, clientChannel.getConnectAttemptCount()); 160 161 final int closeCount = new Random ().nextInt(MESSAGE_COUNT); 162 163 for (int i = 0; i < MESSAGE_COUNT; i++) { 164 if (i == closeCount) { 165 waitForArrivalOrFail(clientWatcher, i); 166 waitForArrivalOrFail(serverWatcher, i); 167 clientComms.getConnectionManager().closeAllConnections(WAIT); 168 if (!waitUntilReconnected()) { 169 fail("Didn't reconnect"); 170 } 171 } 172 createAndSendMessage(); 173 } 174 assertTrue(clientChannel.getConnectAttemptCount() > 1); 175 assertTrue(clientChannel.getConnectCount() > 1); 176 177 waitForMessages(MESSAGE_COUNT); 178 } 179 180 private void waitForMessages(int count) throws InterruptedException { 181 waitForArrivalOrFail(clientWatcher, count); 182 waitForArrivalOrFail(serverWatcher, count); 183 184 String msg = "expected: " + count + ", client sent: " + clientWatcher.sent() + ", client received: " 185 + clientWatcher.received() + ", server sent: " + serverWatcher.sent() + ", server received: " 186 + serverWatcher.received(); 187 188 assertEquals(msg, count, clientWatcher.sent()); 189 assertEquals(msg, count, clientWatcher.received()); 190 assertEquals(msg, count, serverWatcher.sent()); 191 assertEquals(msg, count, serverWatcher.received()); 192 } 193 194 public void testManualReconnectAfterFailure() throws Exception { 195 setUp(0); 196 197 lsnr.stop(WAIT); 198 serverComms.getConnectionManager().closeAllConnections(WAIT); 199 clientComms.getConnectionManager().closeAllConnections(WAIT); 200 201 for (int i = 0; i < 10; i++) { 202 try { 203 clientChannel.open(); 204 fail("Should have thrown an exception"); 205 } catch (TCTimeoutException e) { 206 } catch (UnknownHostException e) { 208 fail(e.getMessage()); 209 } catch (IOException e) { 210 } 212 213 assertFalse(clientChannel.isConnected()); 214 } 215 216 initListener(this.clientWatcher, this.serverWatcher); 217 clientChannel.open(); 218 assertTrue(clientChannel.isConnected()); 219 } 220 221 public void testSendAfterDisconnect() throws Exception { 222 setUp(0); 223 clientChannel.open(); 224 225 createAndSendMessage(); 226 waitForArrivalOrFail(clientWatcher, 1); 227 waitForArrivalOrFail(serverWatcher, 1); 228 229 sendMessagesWhileDisconnected(MESSAGE_COUNT, 25); 230 231 ThreadUtil.reallySleep(5000); 233 } 234 235 public void testZeroMaxRetriesDoesntAutoreconnect() throws Exception { 236 setUp(0); 237 assertEquals(0, clientChannel.getConnectAttemptCount()); 238 assertEquals(0, clientChannel.getConnectCount()); 239 240 clientChannel.open(); 241 assertEquals(1, clientChannel.getConnectAttemptCount()); 242 assertEquals(1, clientChannel.getConnectCount()); 243 clientComms.getConnectionManager().closeAllConnections(WAIT); 244 ThreadUtil.reallySleep(5000); 245 assertEquals(1, clientChannel.getConnectAttemptCount()); 246 assertEquals(1, clientChannel.getConnectCount()); 247 } 248 249 public void testNegativeMaxRetriesAlwaysReconnects() throws Exception { 250 setUp(-1); 251 252 assertEquals(0, clientChannel.getConnectCount()); 253 assertEquals(0, clientChannel.getConnectAttemptCount()); 254 255 clientChannel.open(); 256 257 assertEquals(1, clientChannel.getConnectCount()); 258 assertEquals(1, clientChannel.getConnectAttemptCount()); 259 260 lsnr.stop(WAIT); 261 assertEquals(0, serverComms.getAllListeners().length); 262 263 clientComms.getConnectionManager().closeAllConnections(5000); 264 int count = clientChannel.getConnectAttemptCount(); 265 ThreadUtil.reallySleep(WAIT * 4); 266 assertTrue(clientChannel.getConnectAttemptCount() + " vs " + count, clientChannel.getConnectAttemptCount() > count); 267 assertEquals(1, clientChannel.getConnectCount()); 268 } 269 270 300 public void testGetStatus() throws Exception { 301 setUp(0); 302 clientChannel.open(); 303 assertTrue(clientChannel.isOpen()); 304 clientChannel.close(); 305 assertTrue(clientChannel.isClosed()); 306 } 307 308 public void testSend() throws Exception { 309 setUp(0); 310 clientChannel.open(); 311 int count = 100; 312 List messages = new LinkedList (); 313 for (int i = 0; i < count; i++) { 314 messages.add(createAndSendMessage()); 315 } 316 waitForMessages(count); 317 318 } 319 320 public void testSocketInfo() throws Exception { 321 setUp(0); 322 323 assertNull(clientChannel.getRemoteAddress()); 324 assertNull(clientChannel.getLocalAddress()); 325 326 clientChannel.open(); 327 createAndSendMessage(); 328 waitForMessages(1); 329 330 TCSocketAddress clientRemote = clientChannel.getRemoteAddress(); 331 TCSocketAddress clientLocal = clientChannel.getLocalAddress(); 332 333 MessageChannelInternal[] serverChannels = lsnr.getChannelManager().getChannels(); 334 assertEquals(1, serverChannels.length); 335 MessageChannelInternal serverChannel = serverChannels[0]; 336 337 TCSocketAddress serverRemote = serverChannel.getRemoteAddress(); 338 TCSocketAddress serverLocal = serverChannel.getLocalAddress(); 339 340 assertEquals(clientRemote, serverLocal); 341 assertEquals(clientLocal, serverRemote); 342 } 343 344 private PingMessage createAndSendMessage() { 345 PingMessage ping = createMessage(); 346 clientWatcher.addMessageSent(ping); 347 ping.send(); 348 return ping; 349 } 350 351 private static void waitForArrivalOrFail(MessageSendAndReceiveWatcher watcher, int count) throws InterruptedException { 352 int i = 0; 353 while (!watcher.allReceived() || (watcher.sent() < count) || (watcher.received() < count)) { 354 if (i == ITERATIONS) { 355 fail((watcher.sent() - watcher.received()) + " messages of " + watcher.sent() 356 + " messages total failed to arrive in " + ITERATIONS + " iterations of " + WAIT_PERIOD + " ms. waiting."); 357 } 358 359 Thread.sleep(WAIT_PERIOD); 360 i++; 361 } 362 } 363 364 private ClientMessageChannel createClientMessageChannel(int maxReconnectTries) { 365 ClientMessageChannel ch = clientComms 366 .createClientChannel(new NullSessionManager(), maxReconnectTries, TCSocketAddress.LOOPBACK_IP, lsnr 367 .getBindPort(), WAIT, new FixedValueConfigItem(new ConnectionInfo[] { new ConnectionInfo("localhost", lsnr 368 .getBindPort()) })); 369 ch.addClassMapping(TCMessageType.PING_MESSAGE, PingMessage.class); 370 return ch; 371 } 372 373 private PingMessage createMessage() { 374 PingMessage ping = (PingMessage) clientChannel.createMessage(TCMessageType.PING_MESSAGE); 375 ping.initialize(sq); 376 return ping; 377 } 378 379 private void sendMessagesWhileDisconnected(int count, int afterCount) throws InterruptedException { 380 Random rnd = new Random (); 381 final int closeCount = rnd.nextInt(count); 382 final boolean serverClose = rnd.nextBoolean(); 383 384 Thread thread = null; 385 386 for (int i = 0; i < count; i++) { 387 if (i == closeCount) { 388 thread = new Thread ("Connection closer thread") { 390 public void run() { 391 try { 392 if (serverClose) { 393 logger.info("Initiating close on the SERVER side..."); 394 serverComms.getConnectionManager().asynchCloseAllConnections(); 395 } else { 396 logger.info("Initiating close on the CLIENT side..."); 397 clientComms.getConnectionManager().asynchCloseAllConnections(); 398 } 399 } catch (Throwable t) { 400 setError(t); 401 } 402 } 403 }; 404 Thread.sleep(rnd.nextInt(25) + 10); 405 thread.setDaemon(true); 406 thread.start(); 407 } 408 409 createAndSendMessage(); 410 } 411 412 thread.join(WAIT); 413 assertFalse(thread.isAlive()); 414 415 for (int i = 0; i < afterCount; i++) { 417 createAndSendMessage(); 418 } 419 } 420 421 private boolean waitUntilReconnected() { 422 final long start = System.currentTimeMillis(); 423 while (System.currentTimeMillis() - start < WAIT) { 424 if (clientChannel.isConnected()) return true; 425 try { 426 Thread.sleep(WAIT_PERIOD); 427 } catch (InterruptedException e) { 428 e.printStackTrace(); 429 } 430 } 431 return false; 432 } 433 434 private void setUpClientReceiveSink() { 435 final MessageSendAndReceiveWatcher myServerSenderWatcher = this.serverWatcher; 436 clientChannel.routeMessageType(TCMessageType.PING_MESSAGE, new TCMessageSink() { 437 public void putMessage(TCMessage message) throws UnsupportedMessageTypeException { 438 try { 439 PingMessage ping = (PingMessage) message; 440 ping.hydrate(); 441 } catch (Exception e) { 443 setError(e); 444 } 445 PingMessage ping = (PingMessage) message; 446 myServerSenderWatcher.addMessageReceived(ping); 447 } 448 }); 449 } 450 451 private void setError(Throwable t) { 452 synchronized (System.err) { 453 System.err.println(new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss,S").format(new Date ()) 454 + ": Exception Thrown in thread [" + Thread.currentThread().getName() + "]"); 455 t.printStackTrace(System.err); 456 } 457 error.set(t); 458 } 459 460 public class MessageSendAndReceiveWatcher { 461 462 private TLongHashSet sentSequences = new TLongHashSet(); 463 private TLongHashSet receivedSequences = new TLongHashSet(); 464 465 public synchronized void addMessageSent(PingMessage sent) { 466 sentSequences.add(sent.getSequence()); 467 } 468 469 public synchronized void addMessageReceived(PingMessage received) { 470 receivedSequences.add(received.getSequence()); 471 } 472 473 public int sent() { 474 return sentSequences.size(); 475 } 476 477 public int received() { 478 return receivedSequences.size(); 479 } 480 481 public synchronized boolean allReceived() { 482 return receivedSequences.containsAll(sentSequences.toArray()); 483 } 484 } 485 } 486 | Popular Tags |