1 5 package com.tc.net.protocol.tcm; 6 7 import com.tc.async.api.Sink; 8 import com.tc.async.impl.NullSink; 9 import com.tc.config.schema.dynamic.ConfigItem; 10 import com.tc.exception.TCRuntimeException; 11 import com.tc.logging.TCLogger; 12 import com.tc.logging.TCLogging; 13 import com.tc.net.TCSocketAddress; 14 import com.tc.net.core.ConfigBasedConnectionAddressProvider; 15 import com.tc.net.core.ConnectionAddressProvider; 16 import com.tc.net.core.ConnectionInfo; 17 import com.tc.net.core.Constants; 18 import com.tc.net.core.TCConnection; 19 import com.tc.net.core.TCConnectionManager; 20 import com.tc.net.core.TCConnectionManagerFactory; 21 import com.tc.net.core.TCListener; 22 import com.tc.net.protocol.NetworkStackHarness; 23 import com.tc.net.protocol.NetworkStackHarnessFactory; 24 import com.tc.net.protocol.transport.ClientMessageTransport; 25 import com.tc.net.protocol.transport.ConnectionID; 26 import com.tc.net.protocol.transport.ConnectionIDFactory; 27 import com.tc.net.protocol.transport.ConnectionPolicy; 28 import com.tc.net.protocol.transport.MessageTransport; 29 import com.tc.net.protocol.transport.MessageTransportFactory; 30 import com.tc.net.protocol.transport.MessageTransportListener; 31 import com.tc.net.protocol.transport.ServerMessageTransport; 32 import com.tc.net.protocol.transport.ServerStackProvider; 33 import com.tc.net.protocol.transport.TransportHandshakeErrorContext; 34 import com.tc.net.protocol.transport.TransportHandshakeErrorHandler; 35 import com.tc.net.protocol.transport.TransportHandshakeMessage; 36 import com.tc.net.protocol.transport.TransportHandshakeMessageFactory; 37 import com.tc.net.protocol.transport.TransportHandshakeMessageFactoryImpl; 38 import com.tc.net.protocol.transport.WireProtocolAdaptorFactoryImpl; 39 import com.tc.object.session.SessionProvider; 40 import com.tc.util.concurrent.SetOnceFlag; 41 42 import java.io.IOException ; 43 import java.util.HashSet ; 44 import java.util.List ; 45 import java.util.Set ; 46 47 52 public class CommunicationsManagerImpl implements CommunicationsManager { 53 private static final TCLogger logger = TCLogging.getLogger(CommunicationsManager.class); 54 55 private final SetOnceFlag shutdown = new SetOnceFlag(); 56 private final Set listeners = new HashSet (); 57 private final TCConnectionManager connectionManager; 58 private final boolean privateConnMgr; 59 private final NetworkStackHarnessFactory stackHarnessFactory; 60 private final TransportHandshakeMessageFactory transportHandshakeMessageFactory; 61 private final MessageMonitor monitor; 62 private final ConnectionPolicy connectionPolicy; 63 64 68 public CommunicationsManagerImpl(MessageMonitor monitor, NetworkStackHarnessFactory stackHarnessFactory, 69 ConnectionPolicy connectionPolicy) { 70 this(monitor, stackHarnessFactory, null, connectionPolicy); 71 } 72 73 80 public CommunicationsManagerImpl(MessageMonitor monitor, NetworkStackHarnessFactory stackHarnessFactory, 81 TCConnectionManager connMgr, ConnectionPolicy connectionPolicy) { 82 83 this.monitor = monitor; 84 this.transportHandshakeMessageFactory = new TransportHandshakeMessageFactoryImpl(); 85 this.connectionPolicy = connectionPolicy; 86 this.stackHarnessFactory = stackHarnessFactory; 87 privateConnMgr = (connMgr == null); 88 89 if (null == connMgr) { 90 this.connectionManager = new TCConnectionManagerFactory().getInstance(); 91 } else { 92 this.connectionManager = connMgr; 93 } 94 } 95 96 public TCConnectionManager getConnectionManager() { 97 return this.connectionManager; 98 } 99 100 public boolean isInShutdown() { 101 return shutdown.isSet(); 102 } 103 104 public void shutdown() { 105 if (shutdown.attemptSet()) { 106 if (privateConnMgr) { 107 connectionManager.shutdown(); 108 } 109 } else { 110 logger.warn("shutdown already started"); 111 } 112 } 113 114 public NetworkListener[] getAllListeners() { 115 synchronized (listeners) { 116 return (NetworkListener[]) listeners.toArray(new NetworkListener[listeners.size()]); 117 } 118 } 119 120 public ClientMessageChannel createClientChannel(final SessionProvider sessionProvider, final int maxReconnectTries, 121 String hostname, int port, final int timeout, 122 ConfigItem connectionInfoSource) { 123 126 final ConnectionAddressProvider provider = new ConfigBasedConnectionAddressProvider(connectionInfoSource); 127 128 ClientMessageChannelImpl rv = new ClientMessageChannelImpl(new ConnectionInfo(hostname, port), 129 new TCMessageFactoryImpl(sessionProvider, monitor), 130 new TCMessageRouterImpl()); 131 132 MessageTransportFactory transportFactory = new MessageTransportFactory() { 133 134 public MessageTransport createNewTransport() { 135 TransportHandshakeErrorHandler handshakeErrorHandler = new TransportHandshakeErrorHandler() { 136 137 public void handleHandshakeError(TransportHandshakeErrorContext e) { 138 System.err.println(e); 139 new TCRuntimeException("I'm crashing the client!").printStackTrace(); 140 try { 141 Thread.sleep(30 * 1000); 142 } catch (InterruptedException e1) { 143 e1.printStackTrace(); 144 } 145 System.exit(1); 146 } 147 148 public void handleHandshakeError(TransportHandshakeErrorContext e, TransportHandshakeMessage m) { 149 System.err.println(e); 150 System.err.println(m); 151 new TCRuntimeException("I'm crashing the client").printStackTrace(); 152 try { 153 Thread.sleep(30 * 1000); 154 } catch (InterruptedException e1) { 155 e1.printStackTrace(); 156 } 157 System.exit(1); 158 } 159 160 }; 161 162 return new ClientMessageTransport(maxReconnectTries, provider, timeout, connectionManager, 163 handshakeErrorHandler, transportHandshakeMessageFactory, 164 new WireProtocolAdaptorFactoryImpl()); 165 } 166 167 public MessageTransport createNewTransport(ConnectionID connectionID, TransportHandshakeErrorHandler handler, 168 TransportHandshakeMessageFactory handshakeMessageFactory, 169 List transportListeners) { 170 throw new AssertionError (); 171 } 172 173 public MessageTransport createNewTransport(ConnectionID connectionId, TCConnection connection, 174 TransportHandshakeErrorHandler handler, 175 TransportHandshakeMessageFactory handshakeMessageFactory, 176 List transportListeners) { 177 throw new AssertionError (); 178 } 179 180 }; 181 NetworkStackHarness stackHarness = this.stackHarnessFactory.clientClientHarness(transportFactory, rv, 182 new MessageTransportListener[0]); 183 184 stackHarness.finalizeStack(); 185 186 return rv; 187 } 188 189 192 public NetworkListener createListener(SessionProvider sessionProvider, TCSocketAddress addr, 193 boolean transportDisconnectRemovesChannel, 194 ConnectionIDFactory connectionIdFactory) { 195 return createListener(sessionProvider, addr, transportDisconnectRemovesChannel, 196 connectionIdFactory, true); 197 } 198 199 public NetworkListener createListener(SessionProvider sessionProvider, TCSocketAddress address, 200 boolean transportDisconnectRemovesChannel, 201 ConnectionIDFactory connectionIDFactory, Sink httpSink) { 202 return createListener(sessionProvider, address, transportDisconnectRemovesChannel, 203 connectionIDFactory, true, httpSink); 204 } 205 206 public NetworkListener createListener(SessionProvider sessionProvider, TCSocketAddress addr, 207 boolean transportDisconnectRemovesChannel, 208 ConnectionIDFactory connectionIdFactory, boolean reuseAddr) { 209 return createListener(sessionProvider, addr, transportDisconnectRemovesChannel, 210 connectionIdFactory, reuseAddr, new NullSink()); 211 } 212 213 216 private NetworkListener createListener(SessionProvider sessionProvider, TCSocketAddress addr, 217 boolean transportDisconnectRemovesChannel, 218 ConnectionIDFactory connectionIdFactory, boolean reuseAddr, Sink httpSink) { 219 if (shutdown.isSet()) { throw new IllegalStateException ("Comms manger shut down"); } 220 221 final TCMessageRouter msgRouter = new TCMessageRouterImpl(); 224 final TCMessageFactory msgFactory = new TCMessageFactoryImpl(sessionProvider, monitor); 225 final ServerMessageChannelFactory channelFactory = new ServerMessageChannelFactory() { 226 public MessageChannelInternal createNewChannel(ChannelID id) { 227 return new ServerMessageChannelImpl(id, msgRouter, msgFactory); 228 } 229 }; 230 231 final ChannelManagerImpl channelManager = new ChannelManagerImpl(transportDisconnectRemovesChannel, channelFactory); 232 233 return new NetworkListenerImpl(addr, this, channelManager, msgFactory, msgRouter, reuseAddr, 234 connectionIdFactory, httpSink); 235 } 236 237 TCListener createCommsListener(TCSocketAddress addr, final ServerMessageChannelFactory channelFactory, 238 boolean resueAddr, Set initialConnectionIDs, ConnectionIDFactory connectionIdFactory, 239 Sink httpSink) throws IOException { 240 241 MessageTransportFactory transportFactory = new MessageTransportFactory() { 242 243 public MessageTransport createNewTransport() { 244 throw new AssertionError (); 245 } 246 247 public MessageTransport createNewTransport(ConnectionID connectionID, TransportHandshakeErrorHandler handler, 248 TransportHandshakeMessageFactory handshakeMessageFactory, 249 List transportListeners) { 250 MessageTransport rv = new ServerMessageTransport(connectionID, handler, handshakeMessageFactory); 251 rv.addTransportListeners(transportListeners); 252 return rv; 253 } 254 255 public MessageTransport createNewTransport(ConnectionID connectionId, TCConnection connection, 256 TransportHandshakeErrorHandler handler, 257 TransportHandshakeMessageFactory handshakeMessageFactory, 258 List transportListeners) { 259 MessageTransport rv = new ServerMessageTransport(connectionId, connection, handler, handshakeMessageFactory); 260 rv.addTransportListeners(transportListeners); 261 return rv; 262 } 263 264 }; 265 266 ServerStackProvider stackProvider = new ServerStackProvider(TCLogging.getLogger(ServerStackProvider.class), 267 initialConnectionIDs, stackHarnessFactory, 268 channelFactory, transportFactory, 269 this.transportHandshakeMessageFactory, 270 connectionIdFactory, this.connectionPolicy, 271 new WireProtocolAdaptorFactoryImpl(httpSink)); 272 return connectionManager.createListener(addr, stackProvider, Constants.DEFAULT_ACCEPT_QUEUE_DEPTH, resueAddr); 273 } 274 275 void registerListener(NetworkListener lsnr) { 276 synchronized (listeners) { 277 boolean added = listeners.add(lsnr); 278 279 if (!added) { 280 logger.warn("replaced an existing listener in the listener map"); 281 } 282 } 283 } 284 285 void unregisterListener(NetworkListener lsnr) { 286 synchronized (listeners) { 287 listeners.remove(lsnr); 288 } 289 } 290 291 } 292 | Popular Tags |