KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > tcm > CommunicationsManagerImpl


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.net.protocol.tcm;
6
7 import com.tc.async.api.Sink;
8 import com.tc.async.impl.NullSink;
9 import com.tc.config.schema.dynamic.ConfigItem;
10 import com.tc.exception.TCRuntimeException;
11 import com.tc.logging.TCLogger;
12 import com.tc.logging.TCLogging;
13 import com.tc.net.TCSocketAddress;
14 import com.tc.net.core.ConfigBasedConnectionAddressProvider;
15 import com.tc.net.core.ConnectionAddressProvider;
16 import com.tc.net.core.ConnectionInfo;
17 import com.tc.net.core.Constants;
18 import com.tc.net.core.TCConnection;
19 import com.tc.net.core.TCConnectionManager;
20 import com.tc.net.core.TCConnectionManagerFactory;
21 import com.tc.net.core.TCListener;
22 import com.tc.net.protocol.NetworkStackHarness;
23 import com.tc.net.protocol.NetworkStackHarnessFactory;
24 import com.tc.net.protocol.transport.ClientMessageTransport;
25 import com.tc.net.protocol.transport.ConnectionID;
26 import com.tc.net.protocol.transport.ConnectionIDFactory;
27 import com.tc.net.protocol.transport.ConnectionPolicy;
28 import com.tc.net.protocol.transport.MessageTransport;
29 import com.tc.net.protocol.transport.MessageTransportFactory;
30 import com.tc.net.protocol.transport.MessageTransportListener;
31 import com.tc.net.protocol.transport.ServerMessageTransport;
32 import com.tc.net.protocol.transport.ServerStackProvider;
33 import com.tc.net.protocol.transport.TransportHandshakeErrorContext;
34 import com.tc.net.protocol.transport.TransportHandshakeErrorHandler;
35 import com.tc.net.protocol.transport.TransportHandshakeMessage;
36 import com.tc.net.protocol.transport.TransportHandshakeMessageFactory;
37 import com.tc.net.protocol.transport.TransportHandshakeMessageFactoryImpl;
38 import com.tc.net.protocol.transport.WireProtocolAdaptorFactoryImpl;
39 import com.tc.object.session.SessionProvider;
40 import com.tc.util.concurrent.SetOnceFlag;
41
42 import java.io.IOException JavaDoc;
43 import java.util.HashSet JavaDoc;
44 import java.util.List JavaDoc;
45 import java.util.Set JavaDoc;
46
47 /**
48  * Communications manager for setting up listners and creating client connections
49  *
50  * @author teck
51  */

52 public class CommunicationsManagerImpl implements CommunicationsManager {
53   private static final TCLogger logger = TCLogging.getLogger(CommunicationsManager.class);
54
55   private final SetOnceFlag shutdown = new SetOnceFlag();
56   private final Set JavaDoc listeners = new HashSet JavaDoc();
57   private final TCConnectionManager connectionManager;
58   private final boolean privateConnMgr;
59   private final NetworkStackHarnessFactory stackHarnessFactory;
60   private final TransportHandshakeMessageFactory transportHandshakeMessageFactory;
61   private final MessageMonitor monitor;
62   private final ConnectionPolicy connectionPolicy;
63
64   /**
65    * Create a communications manager. This implies that one or more network handling threads will be started on your
66    * behalf. As such, you should not be instantiating one of these per connection for instance.
67    */

68   public CommunicationsManagerImpl(MessageMonitor monitor, NetworkStackHarnessFactory stackHarnessFactory,
69                                    ConnectionPolicy connectionPolicy) {
70     this(monitor, stackHarnessFactory, null, connectionPolicy);
71   }
72
73   /**
74    * Create a comms manager with the given connection manager. This cstr is mostly for testing, or in the event that you
75    * actually want to use an explicit connection manager
76    *
77    * @param connMgr the connection manager to use
78    * @param serverDescriptors
79    */

80   public CommunicationsManagerImpl(MessageMonitor monitor, NetworkStackHarnessFactory stackHarnessFactory,
81                                    TCConnectionManager connMgr, ConnectionPolicy connectionPolicy) {
82
83     this.monitor = monitor;
84     this.transportHandshakeMessageFactory = new TransportHandshakeMessageFactoryImpl();
85     this.connectionPolicy = connectionPolicy;
86     this.stackHarnessFactory = stackHarnessFactory;
87     privateConnMgr = (connMgr == null);
88
89     if (null == connMgr) {
90       this.connectionManager = new TCConnectionManagerFactory().getInstance();
91     } else {
92       this.connectionManager = connMgr;
93     }
94   }
95
96   public TCConnectionManager getConnectionManager() {
97     return this.connectionManager;
98   }
99
100   public boolean isInShutdown() {
101     return shutdown.isSet();
102   }
103
104   public void shutdown() {
105     if (shutdown.attemptSet()) {
106       if (privateConnMgr) {
107         connectionManager.shutdown();
108       }
109     } else {
110       logger.warn("shutdown already started");
111     }
112   }
113
114   public NetworkListener[] getAllListeners() {
115     synchronized (listeners) {
116       return (NetworkListener[]) listeners.toArray(new NetworkListener[listeners.size()]);
117     }
118   }
119
120   public ClientMessageChannel createClientChannel(final SessionProvider sessionProvider, final int maxReconnectTries,
121                                                   String JavaDoc hostname, int port, final int timeout,
122                                                   ConfigItem connectionInfoSource) {
123     // XXX: maxReconnectTries MUST be non-zero if we have a
124
// once and only once protocol stack.
125

126     final ConnectionAddressProvider provider = new ConfigBasedConnectionAddressProvider(connectionInfoSource);
127
128     ClientMessageChannelImpl rv = new ClientMessageChannelImpl(new ConnectionInfo(hostname, port),
129                                                                new TCMessageFactoryImpl(sessionProvider, monitor),
130                                                                new TCMessageRouterImpl());
131
132     MessageTransportFactory transportFactory = new MessageTransportFactory() {
133
134       public MessageTransport createNewTransport() {
135         TransportHandshakeErrorHandler handshakeErrorHandler = new TransportHandshakeErrorHandler() {
136
137           public void handleHandshakeError(TransportHandshakeErrorContext e) {
138             System.err.println(e);
139             new TCRuntimeException("I'm crashing the client!").printStackTrace();
140             try {
141               Thread.sleep(30 * 1000);
142             } catch (InterruptedException JavaDoc e1) {
143               e1.printStackTrace();
144             }
145             System.exit(1);
146           }
147
148           public void handleHandshakeError(TransportHandshakeErrorContext e, TransportHandshakeMessage m) {
149             System.err.println(e);
150             System.err.println(m);
151             new TCRuntimeException("I'm crashing the client").printStackTrace();
152             try {
153               Thread.sleep(30 * 1000);
154             } catch (InterruptedException JavaDoc e1) {
155               e1.printStackTrace();
156             }
157             System.exit(1);
158           }
159
160         };
161
162         return new ClientMessageTransport(maxReconnectTries, provider, timeout, connectionManager,
163                                           handshakeErrorHandler, transportHandshakeMessageFactory,
164                                           new WireProtocolAdaptorFactoryImpl());
165       }
166
167       public MessageTransport createNewTransport(ConnectionID connectionID, TransportHandshakeErrorHandler handler,
168                                                  TransportHandshakeMessageFactory handshakeMessageFactory,
169                                                  List JavaDoc transportListeners) {
170         throw new AssertionError JavaDoc();
171       }
172
173       public MessageTransport createNewTransport(ConnectionID connectionId, TCConnection connection,
174                                                  TransportHandshakeErrorHandler handler,
175                                                  TransportHandshakeMessageFactory handshakeMessageFactory,
176                                                  List JavaDoc transportListeners) {
177         throw new AssertionError JavaDoc();
178       }
179
180     };
181     NetworkStackHarness stackHarness = this.stackHarnessFactory.clientClientHarness(transportFactory, rv,
182                                                                                     new MessageTransportListener[0]);
183
184     stackHarness.finalizeStack();
185
186     return rv;
187   }
188
189   /**
190    * Creates a network listener with a default network stack.
191    */

192   public NetworkListener createListener(SessionProvider sessionProvider, TCSocketAddress addr,
193                                         boolean transportDisconnectRemovesChannel,
194                                         ConnectionIDFactory connectionIdFactory) {
195     return createListener(sessionProvider, addr, transportDisconnectRemovesChannel,
196                           connectionIdFactory, true);
197   }
198
199   public NetworkListener createListener(SessionProvider sessionProvider, TCSocketAddress address,
200                                         boolean transportDisconnectRemovesChannel,
201                                         ConnectionIDFactory connectionIDFactory, Sink httpSink) {
202     return createListener(sessionProvider, address, transportDisconnectRemovesChannel,
203                           connectionIDFactory, true, httpSink);
204   }
205
206   public NetworkListener createListener(SessionProvider sessionProvider, TCSocketAddress addr,
207                                         boolean transportDisconnectRemovesChannel,
208                                         ConnectionIDFactory connectionIdFactory, boolean reuseAddr) {
209     return createListener(sessionProvider, addr, transportDisconnectRemovesChannel,
210                           connectionIdFactory, reuseAddr, new NullSink());
211   }
212
213   /**
214    * Creates a network listener with a default network stack.
215    */

216   private NetworkListener createListener(SessionProvider sessionProvider, TCSocketAddress addr,
217                                         boolean transportDisconnectRemovesChannel,
218                                         ConnectionIDFactory connectionIdFactory, boolean reuseAddr, Sink httpSink) {
219     if (shutdown.isSet()) { throw new IllegalStateException JavaDoc("Comms manger shut down"); }
220
221     // The idea here is that someday we might want to pass in a custom channel factory. The reason you might want to do
222
// that is so thay you can control the actual class of the channels created off this listener
223
final TCMessageRouter msgRouter = new TCMessageRouterImpl();
224     final TCMessageFactory msgFactory = new TCMessageFactoryImpl(sessionProvider, monitor);
225     final ServerMessageChannelFactory channelFactory = new ServerMessageChannelFactory() {
226       public MessageChannelInternal createNewChannel(ChannelID id) {
227         return new ServerMessageChannelImpl(id, msgRouter, msgFactory);
228       }
229     };
230
231     final ChannelManagerImpl channelManager = new ChannelManagerImpl(transportDisconnectRemovesChannel, channelFactory);
232
233     return new NetworkListenerImpl(addr, this, channelManager, msgFactory, msgRouter, reuseAddr,
234                                    connectionIdFactory, httpSink);
235   }
236
237   TCListener createCommsListener(TCSocketAddress addr, final ServerMessageChannelFactory channelFactory,
238                                  boolean resueAddr, Set JavaDoc initialConnectionIDs, ConnectionIDFactory connectionIdFactory,
239                                  Sink httpSink) throws IOException JavaDoc {
240
241     MessageTransportFactory transportFactory = new MessageTransportFactory() {
242
243       public MessageTransport createNewTransport() {
244         throw new AssertionError JavaDoc();
245       }
246
247       public MessageTransport createNewTransport(ConnectionID connectionID, TransportHandshakeErrorHandler handler,
248                                                  TransportHandshakeMessageFactory handshakeMessageFactory,
249                                                  List JavaDoc transportListeners) {
250         MessageTransport rv = new ServerMessageTransport(connectionID, handler, handshakeMessageFactory);
251         rv.addTransportListeners(transportListeners);
252         return rv;
253       }
254
255       public MessageTransport createNewTransport(ConnectionID connectionId, TCConnection connection,
256                                                  TransportHandshakeErrorHandler handler,
257                                                  TransportHandshakeMessageFactory handshakeMessageFactory,
258                                                  List JavaDoc transportListeners) {
259         MessageTransport rv = new ServerMessageTransport(connectionId, connection, handler, handshakeMessageFactory);
260         rv.addTransportListeners(transportListeners);
261         return rv;
262       }
263
264     };
265
266     ServerStackProvider stackProvider = new ServerStackProvider(TCLogging.getLogger(ServerStackProvider.class),
267                                                                 initialConnectionIDs, stackHarnessFactory,
268                                                                 channelFactory, transportFactory,
269                                                                 this.transportHandshakeMessageFactory,
270                                                                 connectionIdFactory, this.connectionPolicy,
271                                                                 new WireProtocolAdaptorFactoryImpl(httpSink));
272     return connectionManager.createListener(addr, stackProvider, Constants.DEFAULT_ACCEPT_QUEUE_DEPTH, resueAddr);
273   }
274
275   void registerListener(NetworkListener lsnr) {
276     synchronized (listeners) {
277       boolean added = listeners.add(lsnr);
278
279       if (!added) {
280         logger.warn("replaced an existing listener in the listener map");
281       }
282     }
283   }
284
285   void unregisterListener(NetworkListener lsnr) {
286     synchronized (listeners) {
287       listeners.remove(lsnr);
288     }
289   }
290
291 }
292
Popular Tags