1 5 package com.tc.net.protocol.transport; 6 7 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 8 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 9 10 import com.tc.exception.ImplementMe; 11 import com.tc.net.TCSocketAddress; 12 import com.tc.net.core.ConnectionInfo; 13 import com.tc.net.core.MockConnectionManager; 14 import com.tc.net.core.MockTCConnection; 15 import com.tc.net.core.TCConnection; 16 import com.tc.net.core.event.TCConnectionEvent; 17 import com.tc.net.protocol.PlainNetworkStackHarnessFactory; 18 import com.tc.net.protocol.tcm.CommunicationsManagerImpl; 19 import com.tc.net.protocol.tcm.MessageChannel; 20 import com.tc.net.protocol.tcm.NetworkListener; 21 import com.tc.net.protocol.tcm.NullMessageMonitor; 22 import com.tc.object.session.NullSessionManager; 23 import com.tc.test.TCTestCase; 24 import com.tc.util.TCAssertionError; 25 26 import java.util.Collections ; 27 28 31 public class MessageTransportTest extends TCTestCase { 32 private SynchronizedRef clientErrorRef; 33 private SynchronizedRef serverErrorRef; 34 private MessageChannel channel; 35 private ClientHandshakeMessageResponder clientResponder; 36 private ServerHandshakeMessageResponder serverResponder; 37 private LinkedQueue clientResponderReceivedQueue; 38 private LinkedQueue clientResponderSentQueue; 39 private LinkedQueue serverResponderReceivedQueue; 40 private LinkedQueue serverResponderSentQueue; 41 private TransportEventMonitor clientEventMonitor; 42 private TransportEventMonitor serverEventMonitor; 43 private ClientMessageTransport clientTransport; 44 private ServerMessageTransport serverTransport; 45 private ConnectionID connectionId; 46 private MockTCConnection clientConnection; 47 private MockConnectionManager connManager; 48 private CommunicationsManagerImpl commsManager; 49 private NetworkListener lsnr; 50 private TransportHandshakeMessageFactory transportHandshakeMessageFactory; 51 private MockTCConnection serverConnection; 52 53 public void setUp() throws Exception { 54 this.clientResponderReceivedQueue = new LinkedQueue(); 55 this.clientResponderSentQueue = new LinkedQueue(); 56 this.serverResponderReceivedQueue = new LinkedQueue(); 57 this.serverResponderSentQueue = new LinkedQueue(); 58 this.clientErrorRef = new SynchronizedRef(null); 59 this.serverErrorRef = new SynchronizedRef(null); 60 DefaultConnectionIdFactory connectionIDProvider = new DefaultConnectionIdFactory(); 61 this.connectionId = connectionIDProvider.nextConnectionId(); 62 63 this.transportHandshakeMessageFactory = new TransportHandshakeMessageFactoryImpl(); 64 serverConnection = new MockTCConnection(); 65 serverConnection.isConnected(true); 66 67 clientConnection = new MockTCConnection(); 68 clientConnection.remoteAddress = new TCSocketAddress("localhost", 0); 69 clientConnection.localAddress = new TCSocketAddress("localhost", 0); 70 connManager = new MockConnectionManager(); 71 connManager.setConnection(clientConnection); 72 commsManager = new CommunicationsManagerImpl(new NullMessageMonitor(), new PlainNetworkStackHarnessFactory(), 73 connManager, new NullConnectionPolicy()); 74 lsnr = commsManager.createListener(new NullSessionManager(), new TCSocketAddress(0), true, 75 new DefaultConnectionIdFactory()); 76 lsnr.start(Collections.EMPTY_SET); 77 } 78 79 public void tearDown() throws Exception { 80 super.tearDown(); 81 assertTrue(this.clientErrorRef.get() == null); 82 assertTrue(this.serverErrorRef.get() == null); 83 if (channel != null) channel.close(); 84 lsnr.stop(5000); 85 commsManager.shutdown(); 86 } 87 88 public void testDoubleOpen() throws Exception { 89 90 createClientTransport(0); 91 createServerTransport(); 92 93 assertFalse(clientTransport.wasOpened()); 94 95 99 clientTransport.open(); 100 createClientTransport(0); 101 clientTransport.open(); 102 assertEquals(2, connManager.getCreateConnectionCallCount()); 103 104 try { 107 clientTransport.open(); 108 fail("Should have thrown an assertion error."); 109 } catch (TCAssertionError e) { 110 } 112 assertEquals(2, connManager.getCreateConnectionCallCount()); 113 } 114 115 public void testDoubleClose() throws Exception { 116 createClientTransport(0); 117 118 clientTransport.open(); 119 clientTransport.close(); 120 assertEquals(1, clientConnection.getCloseCallCount()); 121 122 try { 123 clientTransport.close(); 124 fail("Should have thrown an AssertionError"); 125 } catch (TCAssertionError e) { 126 } 128 } 129 130 public void testClientTransportEvents() throws Exception { 131 createClientTransport(1); 132 133 TransportEventMonitor extraMonitor = new TransportEventMonitor(); 137 this.clientTransport.addTransportListener(extraMonitor); 138 139 clientTransport.open(); 140 assertTrue(clientEventMonitor.waitForConnect(1000)); 141 assertTrue(extraMonitor.waitForConnect(1000)); 142 143 TCConnectionEvent event = new TCConnectionEvent() { 144 public TCConnection getSource() { 145 return clientConnection; 146 } 147 }; 148 149 clientTransport.closeEvent(event); 150 assertTrue(clientEventMonitor.waitForDisconnect(1000)); 151 assertTrue(extraMonitor.waitForDisconnect(1000)); 152 assertTrue(clientEventMonitor.waitForConnectAttempt(1000)); 153 assertTrue(extraMonitor.waitForConnectAttempt(1000)); 154 155 clientTransport.close(); 156 assertTrue(clientEventMonitor.waitForClose(1000)); 157 assertTrue(extraMonitor.waitForClose(1000)); 158 } 159 160 public void testServerTransportEvents() throws Exception { 161 createServerTransport(); 162 assertFalse(serverEventMonitor.waitForConnect(500)); 163 164 TransportEventMonitor extraMonitor = new TransportEventMonitor(); 168 serverTransport.addTransportListener(extraMonitor); 169 170 TCConnectionEvent event = new TCConnectionEvent() { 171 public TCConnection getSource() { 172 return serverConnection; 173 } 174 }; 175 176 serverTransport.closeEvent(event); 177 assertTrue(serverEventMonitor.waitForDisconnect(1000)); 178 assertTrue(extraMonitor.waitForDisconnect(1000)); 179 180 assertFalse(serverEventMonitor.waitForConnectAttempt(1000)); 181 assertFalse(extraMonitor.waitForConnectAttempt(1000)); 182 183 serverTransport.close(); 184 assertTrue(serverEventMonitor.waitForClose(1000)); 185 assertTrue(extraMonitor.waitForClose(1000)); 186 } 187 188 public void testCloseBeforeOpen() throws Exception { 189 createClientTransport(0); 190 try { 191 clientTransport.close(); 192 fail("Should have thrown an assertion error."); 193 } catch (TCAssertionError e) { 194 } 196 } 197 198 public void testWasOpened() throws Exception { 199 createClientTransport(0); 200 assertFalse(clientTransport.wasOpened()); 201 202 clientTransport.open(); 203 assertTrue(clientTransport.wasOpened()); 204 205 clientTransport.close(); 206 assertTrue(clientTransport.wasOpened()); 207 } 208 209 public void testOpenCloseAndIsConnected() throws Exception { 210 createClientTransport(0); 211 assertFalse(clientTransport.isConnected()); 212 213 clientTransport.open(); 214 assertTrue(clientTransport.isConnected()); 215 216 clientTransport.close(); 217 assertTrue(clientEventMonitor.waitForClose(1000)); 218 assertFalse(clientTransport.isOpen.get()); 219 220 createServerTransport(); 221 TransportHandshakeMessage ack = this.transportHandshakeMessageFactory.createAck(connectionId, this.serverTransport 222 .getConnection()); 223 this.serverTransport.receiveTransportMessage(ack); 224 assertTrue(serverEventMonitor.waitForConnect(1000)); 225 assertTrue(serverTransport.isConnected()); 226 } 227 228 private void createServerTransport() throws Exception { 229 this.serverTransport = new ServerMessageTransport(this.connectionId, this.serverConnection, 230 createHandshakeErrorHandler(), 231 this.transportHandshakeMessageFactory); 232 233 this.serverResponder = new ServerHandshakeMessageResponder(this.serverResponderSentQueue, 234 this.serverResponderReceivedQueue, 235 this.transportHandshakeMessageFactory, 236 this.connectionId, serverTransport, this.serverErrorRef); 237 this.serverConnection.setMessageSink(this.serverResponder); 238 this.serverEventMonitor = new TransportEventMonitor(); 239 this.serverTransport.addTransportListener(this.serverEventMonitor); 240 } 241 242 private void createClientTransport(int maxReconnectTries) throws Exception { 243 this.clientTransport = new ClientMessageTransport(maxReconnectTries, 244 new ConnectionInfo(TCSocketAddress.LOOPBACK_IP, 0), 0, 245 connManager, createHandshakeErrorHandler(), 246 this.transportHandshakeMessageFactory, 247 new WireProtocolAdaptorFactoryImpl()); 248 this.clientResponder = new ClientHandshakeMessageResponder(this.clientResponderSentQueue, 249 this.clientResponderReceivedQueue, 250 this.transportHandshakeMessageFactory, 251 this.connectionId, this.clientTransport, 252 this.clientErrorRef); 253 this.clientConnection.setMessageSink(this.clientResponder); 254 this.clientEventMonitor = new TransportEventMonitor(); 255 this.clientTransport.addTransportListener(this.clientEventMonitor); 256 } 257 258 private TransportHandshakeErrorHandler createHandshakeErrorHandler() { 259 return new TransportHandshakeErrorHandler() { 260 261 public void handleHandshakeError(TransportHandshakeErrorContext e) { 262 new ImplementMe(e.toString()).printStackTrace(); 263 } 264 265 public void handleHandshakeError(TransportHandshakeErrorContext e, TransportHandshakeMessage m) { 266 throw new ImplementMe(); 267 268 } 269 270 }; 271 } 272 273 } 274 | Popular Tags |