1 5 package com.tc.net.protocol.transport; 6 7 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 8 9 import com.tc.logging.CustomerLogging; 10 import com.tc.logging.TCLogger; 11 import com.tc.net.core.TCConnection; 12 import com.tc.net.protocol.NetworkStackHarness; 13 import com.tc.net.protocol.NetworkStackHarnessFactory; 14 import com.tc.net.protocol.ProtocolAdaptorFactory; 15 import com.tc.net.protocol.StackNotFoundException; 16 import com.tc.net.protocol.TCProtocolAdaptor; 17 import com.tc.net.protocol.tcm.ServerMessageChannelFactory; 18 import com.tc.util.Assert; 19 20 import java.util.ArrayList ; 21 import java.util.Iterator ; 22 import java.util.List ; 23 import java.util.Map ; 24 import java.util.Set ; 25 26 29 public class ServerStackProvider implements NetworkStackProvider, MessageTransportListener, ProtocolAdaptorFactory { 30 31 private final Map harnesses = new ConcurrentHashMap(); 32 private final NetworkStackHarnessFactory harnessFactory; 33 private final ServerMessageChannelFactory channelFactory; 34 private final TransportHandshakeMessageFactory handshakeMessageFactory; 35 private final ConnectionIDFactory connectionIdFactory; 36 private final ConnectionPolicy connectionPolicy; 37 private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory; 38 private final MessageTransportFactory messageTransportFactory; 39 private final List transportListeners = new ArrayList (1); 40 private final TCLogger logger; 41 private final TCLogger consoleLogger = CustomerLogging.getConsoleLogger(); 42 43 public ServerStackProvider(TCLogger logger, Set initialConnectionIDs, NetworkStackHarnessFactory harnessFactory, 44 ServerMessageChannelFactory channelFactory, 45 MessageTransportFactory messageTransportFactory, 46 TransportHandshakeMessageFactory handshakeMessageFactory, 47 ConnectionIDFactory connectionIdFactory, ConnectionPolicy connectionPolicy, 48 WireProtocolAdaptorFactory wireProtocolAdaptorFactory) { 49 this.messageTransportFactory = messageTransportFactory; 50 this.connectionPolicy = connectionPolicy; 51 this.wireProtocolAdaptorFactory = wireProtocolAdaptorFactory; 52 Assert.assertNotNull(harnessFactory); 53 this.harnessFactory = harnessFactory; 54 this.channelFactory = channelFactory; 55 this.handshakeMessageFactory = handshakeMessageFactory; 56 this.connectionIdFactory = connectionIdFactory; 57 this.transportListeners.add(this); 58 this.logger = logger; 59 for (Iterator i = initialConnectionIDs.iterator(); i.hasNext();) { 60 ConnectionID connectionID = (ConnectionID) i.next(); 61 logger.info("Preparing comms stack for previously connected client: " + connectionID); 62 newStackHarness(connectionID, messageTransportFactory.createNewTransport(connectionID, 63 createHandshakeErrorHandler(), 64 handshakeMessageFactory, 65 transportListeners)); 66 } 67 } 68 69 public MessageTransport attachNewConnection(ConnectionID connectionId, TCConnection connection) 70 throws StackNotFoundException { 71 Assert.assertNotNull(connection); 72 73 final NetworkStackHarness harness; 74 final MessageTransport rv; 75 if (connectionId.isNull()) { 76 connectionId = connectionIdFactory.nextConnectionId(); 77 78 rv = messageTransportFactory.createNewTransport(connectionId, connection, createHandshakeErrorHandler(), 79 handshakeMessageFactory, transportListeners); 80 newStackHarness(connectionId, rv); 81 } else { 82 harness = (NetworkStackHarness) harnesses.get(connectionId); 83 84 if (harness == null) { 85 throw new StackNotFoundException(connectionId); 86 } else { 87 rv = harness.attachNewConnection(connection); 88 } 89 } 90 return rv; 91 } 92 93 private void newStackHarness(ConnectionID id, MessageTransport transport) { 94 final NetworkStackHarness harness; 95 harness = harnessFactory.createServerHarness(channelFactory, transport, new MessageTransportListener[] { this }); 96 harness.finalizeStack(); 97 Object previous = harnesses.put(id, harness); 98 if (previous != null) { throw new AssertionError ("previous is " + previous); } 99 } 100 101 private TransportHandshakeErrorHandler createHandshakeErrorHandler() { 102 return new TransportHandshakeErrorHandler() { 103 104 public void handleHandshakeError(TransportHandshakeErrorContext e) { 105 consoleLogger.info(e.getMessage()); 106 logger.info(e.getMessage()); 107 } 108 109 public void handleHandshakeError(TransportHandshakeErrorContext e, TransportHandshakeMessage m) { 110 logger.info(e.getMessage()); 111 } 112 113 }; 114 } 115 116 NetworkStackHarness removeNetworkStack(ConnectionID connectionId) { 117 return (NetworkStackHarness) harnesses.remove(connectionId); 118 } 119 120 123 public void notifyTransportConnected(MessageTransport transport) { 124 } 126 127 130 public void notifyTransportDisconnected(MessageTransport transport) { 131 } 137 138 private void close(ConnectionID connectionId) { 139 NetworkStackHarness harness = removeNetworkStack(connectionId); 140 if (harness == null) { throw new AssertionError ( 141 "Receive a transport closed event for a transport that isn't in the map :" 142 + connectionId); } 143 } 144 145 public void notifyTransportConnectAttempt(MessageTransport transport) { 146 } 148 149 153 public void notifyTransportClosed(MessageTransport transport) { 154 close(transport.getConnectionId()); 155 this.connectionPolicy.clientDisconnected(); 156 } 157 158 161 162 public TCProtocolAdaptor getInstance() { 163 MessageSink sink = new MessageSink(createHandshakeErrorHandler()); 164 return this.wireProtocolAdaptorFactory.newWireProtocolAdaptor(sink); 165 } 166 167 170 171 class MessageSink implements WireProtocolMessageSink { 172 private final TransportHandshakeErrorHandler handshakeErrorHandler; 173 private volatile boolean isSynReceived = false; 174 private volatile boolean isHandshakeError = false; 175 private volatile MessageTransport transport; 176 177 private MessageSink(TransportHandshakeErrorHandler handshakeErrorHandler) { 178 this.handshakeErrorHandler = handshakeErrorHandler; 179 } 180 181 public void putMessage(WireProtocolMessage message) { 182 if (!isSynReceived) { 183 synchronized (this) { 184 if (!isSynReceived) { 185 isSynReceived = true; 186 verifyAndHandleSyn(message); 187 message.recycle(); 188 return; 189 } 190 } 191 } 192 if (!isHandshakeError) { 193 this.transport.receiveTransportMessage(message); 194 } 195 } 196 197 private void verifyAndHandleSyn(WireProtocolMessage message) { 198 if (!verifySyn(message)) { 199 handleHandshakeError(new TransportHandshakeErrorContext("Expected a SYN message but received: " + message)); 200 } else { 201 try { 202 handleSyn((SynMessage) message); 203 } catch (StackNotFoundException e) { 204 handleHandshakeError(new TransportHandshakeErrorContext( 205 "Unable to find communications stack. " 206 + e.getMessage() 207 + ". This is usually caused by a client from a prior run trying to illegally reconnect to the server." 208 + " While that client is being rejected, everything else should proceed as normal. ", 209 e)); 210 } 211 } 212 } 213 214 private void handleHandshakeError(TransportHandshakeErrorContext ctxt) { 215 this.isHandshakeError = true; 216 this.handshakeErrorHandler.handleHandshakeError(ctxt); 217 } 218 219 private void handleSyn(SynMessage syn) throws StackNotFoundException { 220 ConnectionID connectionId = syn.getConnectionId(); 221 222 if (connectionId == null) { 223 sendSynAck(connectionId, new TransportHandshakeErrorContext("Invalid connection id: " + connectionId), syn 224 .getSource()); 225 this.isHandshakeError = true; 226 return; 227 } 228 229 this.transport = attachNewConnection(connectionId, syn.getSource()); 230 connectionId = this.transport.getConnectionId(); 231 sendSynAck(connectionId, syn.getSource()); 232 } 233 234 private boolean verifySyn(WireProtocolMessage message) { 235 return message instanceof TransportHandshakeMessage && ((TransportHandshakeMessage) message).isSyn(); 236 } 237 238 private void sendSynAck(ConnectionID connectionId, TCConnection source) { 239 sendSynAck(connectionId, null, source); 240 } 241 242 private void sendSynAck(ConnectionID connectionId, TransportHandshakeErrorContext errorContext, TCConnection source) { 243 TransportHandshakeMessage synAck; 244 boolean isError = (errorContext != null); 245 int maxConnections = connectionPolicy.getMaxConnections(); 246 connectionPolicy.clientConnected(); 247 boolean isMaxConnectionsExceeded = connectionPolicy.maxConnectionsExceeded(); 250 if (isError) { 251 synAck = handshakeMessageFactory.createSynAck(connectionId, errorContext, source, isMaxConnectionsExceeded, 252 maxConnections); 253 } else { 254 synAck = handshakeMessageFactory.createSynAck(connectionId, source, isMaxConnectionsExceeded, maxConnections); 255 } 256 sendMessage(synAck); 257 } 258 259 private void sendMessage(WireProtocolMessage message) { 260 transport.sendToConnection(message); 261 } 262 } 263 264 } 265 | Popular Tags |