1 4 package com.tc.net.protocol.transport; 5 6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 7 8 import com.tc.config.schema.dynamic.FixedValueConfigItem; 9 import com.tc.exception.ImplementMe; 10 import com.tc.exception.TCInternalError; 11 import com.tc.exception.TCRuntimeException; 12 import com.tc.logging.TCLogger; 13 import com.tc.logging.TCLogging; 14 import com.tc.net.MaxConnectionsExceededException; 15 import com.tc.net.core.ConfigBasedConnectionAddressProvider; 16 import com.tc.net.core.ConnectionAddressProvider; 17 import com.tc.net.core.ConnectionInfo; 18 import com.tc.net.core.TCConnection; 19 import com.tc.net.core.TCConnectionManager; 20 import com.tc.net.core.event.TCConnectionEvent; 21 import com.tc.net.protocol.NetworkStackID; 22 import com.tc.net.protocol.TCNetworkMessage; 23 import com.tc.net.protocol.TCProtocolAdaptor; 24 import com.tc.util.Assert; 25 import com.tc.util.TCTimeoutException; 26 import com.tc.util.concurrent.TCExceptionResultException; 27 import com.tc.util.concurrent.TCFuture; 28 29 import java.io.IOException ; 30 import java.util.List ; 31 32 35 public class ClientMessageTransport extends MessageTransportBase { 36 private static final TCLogger logger = TCLogging.getLogger(ClientMessageTransport.class); 37 private static final long SYN_ACK_TIMEOUT = 120000; private final int maxReconnectTries; 39 private final ClientConnectionEstablisher connectionEstablisher; 40 private boolean wasOpened = false; 41 private TCFuture waitForSynAckResult; 42 private final ConnectionAddressProvider connAddressProvider; 43 private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory; 44 private final SynchronizedBoolean isOpening = new SynchronizedBoolean(false); 45 46 52 53 public ClientMessageTransport(int maxReconnectTries, ConnectionInfo connInfo, int timeout, 54 TCConnectionManager connManager, TransportHandshakeErrorHandler handshakeErrorHandler, 55 TransportHandshakeMessageFactory messageFactory, 56 WireProtocolAdaptorFactory wireProtocolAdaptorFactory) { 57 this(maxReconnectTries, 61 new ConfigBasedConnectionAddressProvider(new FixedValueConfigItem(new ConnectionInfo[] { connInfo })), 62 timeout, connManager, handshakeErrorHandler, messageFactory, wireProtocolAdaptorFactory); 63 } 64 65 71 public ClientMessageTransport(int maxReconnectTries, ConnectionAddressProvider connInfoProvider, int timeout, 72 TCConnectionManager connManager, TransportHandshakeErrorHandler handshakeErrorHandler, 73 TransportHandshakeMessageFactory messageFactory, 74 WireProtocolAdaptorFactory wireProtocolAdaptorFactory) { 75 76 super(MessageTransportState.STATE_START, handshakeErrorHandler, messageFactory, false, logger); 77 this.maxReconnectTries = maxReconnectTries; 78 this.connAddressProvider = connInfoProvider; 79 this.wireProtocolAdaptorFactory = wireProtocolAdaptorFactory; 80 81 this.connectionEstablisher = new ClientConnectionEstablisher(this, connManager, connAddressProvider, logger, 82 maxReconnectTries, timeout); 83 } 84 85 93 public NetworkStackID open() throws TCTimeoutException, IOException , MaxConnectionsExceededException { 94 isOpening.set(true); 99 synchronized (isOpen) { 100 Assert.eval("can't open an already open transport", !isOpen.get()); 101 try { 102 connectionEstablisher.open(); 103 HandshakeResult result = handShake(); 104 if (result.isMaxConnectionsExceeded()) { 105 List tl = this.getTransportListeners(); 109 this.removeTransportListeners(); 110 clearConnection(); 111 this.addTransportListeners(tl); 112 status.reset(); 113 throw new MaxConnectionsExceededException("Maximum number of client connections exceeded: " 114 + result.maxConnections()); 115 } 116 Assert.eval(!this.connectionId.isNull()); 117 isOpen.set(true); 118 wasOpened = true; 119 sendAck(); 120 return new NetworkStackID(this.connectionId.getChannelID()); 121 } catch (TCTimeoutException e) { 122 status.reset(); 123 throw e; 124 } catch (IOException e) { 125 status.reset(); 126 throw e; 127 } finally { 128 isOpening.set(false); 129 } 130 } 131 } 132 133 136 public boolean wasOpened() { 137 return wasOpened; 138 } 139 140 public boolean isOpen() { 141 return !isOpening.get() && !isOpen.get(); 142 } 143 144 public void closeEvent(TCConnectionEvent event) { 146 147 if (isOpen()) return; 148 149 TCConnection src = event.getSource(); 150 Assert.assertSame(getConnection(), src); 151 152 if (!(maxReconnectTries == 0)) { 153 if (logger.isDebugEnabled()) { 154 logger.debug("Caught connection close event: " + event); 155 } 156 status.reset(); 157 fireTransportDisconnectedEvent(); } else { 159 super.closeEvent(event); 160 161 synchronized (status) { 162 if (!status.isEnd()) status.end(); 163 } 164 } 165 } 166 167 protected void receiveTransportMessageImpl(WireProtocolMessage message) { 168 synchronized (status) { 169 if (status.isSynSent()) { 170 handleSynAck(message); 171 message.recycle(); 172 return; 173 } 174 } 175 super.receiveToReceiveLayer(message); 176 } 177 178 private void handleSynAck(WireProtocolMessage message) { 179 if (!verifySynAck(message)) { 180 handleHandshakeError(new TransportHandshakeErrorContext( 181 "Received a message that was not a SYN_ACK while waiting for SYN_ACK: " 182 + message)); 183 } else { 184 SynAckMessage synAck = (SynAckMessage) message; 185 if (synAck.hasErrorContext()) { throw new ImplementMe(synAck.getErrorContext()); } 186 187 if (connectionId != null && !ConnectionID.NULL_ID.equals(connectionId)) { 188 Assert.eval(connectionId.equals(synAck.getConnectionId())); 190 } 191 if (!synAck.isMaxConnectionsExceeded()) { 192 this.connectionId = synAck.getConnectionId(); 193 194 Assert.assertNotNull("Connection id from the server was null!", this.connectionId); 195 Assert.eval(!ConnectionID.NULL_ID.equals(this.connectionId)); 196 Assert.assertNotNull(this.waitForSynAckResult); 197 } 198 199 this.waitForSynAckResult.set(synAck); 200 } 201 202 return; 203 } 204 205 private boolean verifySynAck(TCNetworkMessage message) { 206 return message instanceof TransportHandshakeMessage && ((TransportHandshakeMessage) message).isSynAck(); 208 } 209 210 217 HandshakeResult handShake() throws TCTimeoutException { 218 sendSyn(); 219 SynAckMessage synAck = waitForSynAck(); 220 return new HandshakeResult(synAck.isMaxConnectionsExceeded(), synAck.getMaxConnections()); 221 } 222 223 private SynAckMessage waitForSynAck() throws TCTimeoutException { 224 try { 225 SynAckMessage synAck = (SynAckMessage) waitForSynAckResult.get(SYN_ACK_TIMEOUT); 226 return synAck; 227 } catch (InterruptedException e) { 228 throw new TCRuntimeException(e); 229 } catch (TCExceptionResultException e) { 230 throw new TCInternalError(e); 231 } 232 } 233 234 private void sendSyn() { 235 synchronized (status) { 236 if (status.isEstablished() || status.isSynSent()) { throw new AssertionError (" ERROR !!! " + status); } 237 waitForSynAckResult = new TCFuture(status); 238 TransportHandshakeMessage syn = this.messageFactory.createSyn(this.connectionId, getConnection()); 239 this.sendToConnection(syn); 241 this.status.synSent(); 242 } 243 } 244 245 private void sendAck() { 246 synchronized (status) { 247 Assert.eval(status.isSynSent()); 248 TransportHandshakeMessage ack = this.messageFactory.createAck(this.connectionId, getConnection()); 249 this.sendToConnection(ack); 251 this.status.established(); 252 fireTransportConnectedEvent(); 253 } 254 } 255 256 void reconnect() throws Exception { 257 Assert.eval(!isConnected()); 258 try { 259 HandshakeResult result = handShake(); 260 sendAck(); 261 if (result.isMaxConnectionsExceeded()) { 262 close(); 263 throw new MaxConnectionsExceededException(getMaxConnectionsExceededMessage(result.maxConnections())); 264 } 265 } catch (Exception t) { 266 status.reset(); 267 throw t; 268 } 269 } 270 271 private String getMaxConnectionsExceededMessage(int maxConnections) { 272 return "Maximum number of client connections exceeded: " + maxConnections; 273 } 274 275 TCProtocolAdaptor getProtocolAdapter() { 276 return wireProtocolAdaptorFactory.newWireProtocolAdaptor(new WireProtocolMessageSink() { 277 public void putMessage(WireProtocolMessage message) { 278 receiveTransportMessage(message); 279 } 280 }); 281 } 282 283 void endIfDisconnected() { 284 synchronized (this.status) { 285 if (!this.isConnected()) { 286 if (!this.status.isEnd()) { 287 this.status.end(); 288 } 289 } 290 } 291 292 } 293 294 private static final class HandshakeResult { 295 private final boolean maxConnectionsExceeded; 296 private final int maxConnections; 297 298 private HandshakeResult(boolean maxConnectionsExceeded, int maxConnections) { 299 this.maxConnectionsExceeded = maxConnectionsExceeded; 300 this.maxConnections = maxConnections; 301 } 302 303 public int maxConnections() { 304 return this.maxConnections; 305 } 306 307 public boolean isMaxConnectionsExceeded() { 308 return this.maxConnectionsExceeded; 309 } 310 } 311 312 } 313
| Popular Tags
|