KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > transport > ServerStackProvider


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.net.protocol.transport;
6
7 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
8
9 import com.tc.logging.CustomerLogging;
10 import com.tc.logging.TCLogger;
11 import com.tc.net.core.TCConnection;
12 import com.tc.net.protocol.NetworkStackHarness;
13 import com.tc.net.protocol.NetworkStackHarnessFactory;
14 import com.tc.net.protocol.ProtocolAdaptorFactory;
15 import com.tc.net.protocol.StackNotFoundException;
16 import com.tc.net.protocol.TCProtocolAdaptor;
17 import com.tc.net.protocol.tcm.ServerMessageChannelFactory;
18 import com.tc.util.Assert;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.List JavaDoc;
23 import java.util.Map JavaDoc;
24 import java.util.Set JavaDoc;
25
26 /**
27  * Provides network statcks on the server side
28  */

29 public class ServerStackProvider implements NetworkStackProvider, MessageTransportListener, ProtocolAdaptorFactory {
30
31   private final Map harnesses = new ConcurrentHashMap();
32   private final NetworkStackHarnessFactory harnessFactory;
33   private final ServerMessageChannelFactory channelFactory;
34   private final TransportHandshakeMessageFactory handshakeMessageFactory;
35   private final ConnectionIDFactory connectionIdFactory;
36   private final ConnectionPolicy connectionPolicy;
37   private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory;
38   private final MessageTransportFactory messageTransportFactory;
39   private final List JavaDoc transportListeners = new ArrayList JavaDoc(1);
40   private final TCLogger logger;
41   private final TCLogger consoleLogger = CustomerLogging.getConsoleLogger();
42
43   public ServerStackProvider(TCLogger logger, Set JavaDoc initialConnectionIDs, NetworkStackHarnessFactory harnessFactory,
44                              ServerMessageChannelFactory channelFactory,
45                              MessageTransportFactory messageTransportFactory,
46                              TransportHandshakeMessageFactory handshakeMessageFactory,
47                              ConnectionIDFactory connectionIdFactory, ConnectionPolicy connectionPolicy,
48                              WireProtocolAdaptorFactory wireProtocolAdaptorFactory) {
49     this.messageTransportFactory = messageTransportFactory;
50     this.connectionPolicy = connectionPolicy;
51     this.wireProtocolAdaptorFactory = wireProtocolAdaptorFactory;
52     Assert.assertNotNull(harnessFactory);
53     this.harnessFactory = harnessFactory;
54     this.channelFactory = channelFactory;
55     this.handshakeMessageFactory = handshakeMessageFactory;
56     this.connectionIdFactory = connectionIdFactory;
57     this.transportListeners.add(this);
58     this.logger = logger;
59     for (Iterator JavaDoc i = initialConnectionIDs.iterator(); i.hasNext();) {
60       ConnectionID connectionID = (ConnectionID) i.next();
61       logger.info("Preparing comms stack for previously connected client: " + connectionID);
62       newStackHarness(connectionID, messageTransportFactory.createNewTransport(connectionID,
63                                                                                createHandshakeErrorHandler(),
64                                                                                handshakeMessageFactory,
65                                                                                transportListeners));
66     }
67   }
68
69   public MessageTransport attachNewConnection(ConnectionID connectionId, TCConnection connection)
70       throws StackNotFoundException {
71     Assert.assertNotNull(connection);
72
73     final NetworkStackHarness harness;
74     final MessageTransport rv;
75     if (connectionId.isNull()) {
76       connectionId = connectionIdFactory.nextConnectionId();
77
78       rv = messageTransportFactory.createNewTransport(connectionId, connection, createHandshakeErrorHandler(),
79                                                       handshakeMessageFactory, transportListeners);
80       newStackHarness(connectionId, rv);
81     } else {
82       harness = (NetworkStackHarness) harnesses.get(connectionId);
83
84       if (harness == null) {
85         throw new StackNotFoundException(connectionId);
86       } else {
87         rv = harness.attachNewConnection(connection);
88       }
89     }
90     return rv;
91   }
92
93   private void newStackHarness(ConnectionID id, MessageTransport transport) {
94     final NetworkStackHarness harness;
95     harness = harnessFactory.createServerHarness(channelFactory, transport, new MessageTransportListener[] { this });
96     harness.finalizeStack();
97     Object JavaDoc previous = harnesses.put(id, harness);
98     if (previous != null) { throw new AssertionError JavaDoc("previous is " + previous); }
99   }
100
101   private TransportHandshakeErrorHandler createHandshakeErrorHandler() {
102     return new TransportHandshakeErrorHandler() {
103
104       public void handleHandshakeError(TransportHandshakeErrorContext e) {
105         consoleLogger.info(e.getMessage());
106         logger.info(e.getMessage());
107       }
108
109       public void handleHandshakeError(TransportHandshakeErrorContext e, TransportHandshakeMessage m) {
110         logger.info(e.getMessage());
111       }
112
113     };
114   }
115
116   NetworkStackHarness removeNetworkStack(ConnectionID connectionId) {
117     return (NetworkStackHarness) harnesses.remove(connectionId);
118   }
119
120   /*********************************************************************************************************************
121    * MessageTransportListener methods.
122    */

123   public void notifyTransportConnected(MessageTransport transport) {
124     // don't care
125
}
126
127   /**
128    * A client disconnected.
129    */

130   public void notifyTransportDisconnected(MessageTransport transport) {
131     // Currenly we dont care about this event here. In AbstractMessageChannel in the server, this event closes the
132
// channel
133
// so effectively a disconnected transport means a closed channel in the server. When we later implement clients
134
// reconnect
135
// this will change and this will trigger a reconnect window for that client here.
136
}
137
138   private void close(ConnectionID connectionId) {
139     NetworkStackHarness harness = removeNetworkStack(connectionId);
140     if (harness == null) { throw new AssertionError JavaDoc(
141                                                     "Receive a transport closed event for a transport that isn't in the map :"
142                                                         + connectionId); }
143   }
144
145   public void notifyTransportConnectAttempt(MessageTransport transport) {
146     // don't care
147
}
148
149   /**
150    * The connection was closed. The client is never allowed to reconnect. Removes stack associated with the given
151    * transport from the map of managed stacks.
152    */

153   public void notifyTransportClosed(MessageTransport transport) {
154     close(transport.getConnectionId());
155     this.connectionPolicy.clientDisconnected();
156   }
157
158   /*********************************************************************************************************************
159    * ProtocolAdaptorFactory interface
160    */

161
162   public TCProtocolAdaptor getInstance() {
163     MessageSink sink = new MessageSink(createHandshakeErrorHandler());
164     return this.wireProtocolAdaptorFactory.newWireProtocolAdaptor(sink);
165   }
166
167   /*********************************************************************************************************************
168    * private stuff
169    */

170
171   class MessageSink implements WireProtocolMessageSink {
172     private final TransportHandshakeErrorHandler handshakeErrorHandler;
173     private volatile boolean isSynReceived = false;
174     private volatile boolean isHandshakeError = false;
175     private volatile MessageTransport transport;
176
177     private MessageSink(TransportHandshakeErrorHandler handshakeErrorHandler) {
178       this.handshakeErrorHandler = handshakeErrorHandler;
179     }
180
181     public void putMessage(WireProtocolMessage message) {
182       if (!isSynReceived) {
183         synchronized (this) {
184           if (!isSynReceived) {
185             isSynReceived = true;
186             verifyAndHandleSyn(message);
187             message.recycle();
188             return;
189           }
190         }
191       }
192       if (!isHandshakeError) {
193         this.transport.receiveTransportMessage(message);
194       }
195     }
196
197     private void verifyAndHandleSyn(WireProtocolMessage message) {
198       if (!verifySyn(message)) {
199         handleHandshakeError(new TransportHandshakeErrorContext("Expected a SYN message but received: " + message));
200       } else {
201         try {
202           handleSyn((SynMessage) message);
203         } catch (StackNotFoundException e) {
204           handleHandshakeError(new TransportHandshakeErrorContext(
205                                                                   "Unable to find communications stack. "
206                                                                       + e.getMessage()
207                                                                       + ". This is usually caused by a client from a prior run trying to illegally reconnect to the server."
208                                                                       + " While that client is being rejected, everything else should proceed as normal. ",
209                                                                   e));
210         }
211       }
212     }
213
214     private void handleHandshakeError(TransportHandshakeErrorContext ctxt) {
215       this.isHandshakeError = true;
216       this.handshakeErrorHandler.handleHandshakeError(ctxt);
217     }
218
219     private void handleSyn(SynMessage syn) throws StackNotFoundException {
220       ConnectionID connectionId = syn.getConnectionId();
221
222       if (connectionId == null) {
223         sendSynAck(connectionId, new TransportHandshakeErrorContext("Invalid connection id: " + connectionId), syn
224             .getSource());
225         this.isHandshakeError = true;
226         return;
227       }
228
229       this.transport = attachNewConnection(connectionId, syn.getSource());
230       connectionId = this.transport.getConnectionId();
231       sendSynAck(connectionId, syn.getSource());
232     }
233
234     private boolean verifySyn(WireProtocolMessage message) {
235       return message instanceof TransportHandshakeMessage && ((TransportHandshakeMessage) message).isSyn();
236     }
237
238     private void sendSynAck(ConnectionID connectionId, TCConnection source) {
239       sendSynAck(connectionId, null, source);
240     }
241
242     private void sendSynAck(ConnectionID connectionId, TransportHandshakeErrorContext errorContext, TCConnection source) {
243       TransportHandshakeMessage synAck;
244       boolean isError = (errorContext != null);
245       int maxConnections = connectionPolicy.getMaxConnections();
246       connectionPolicy.clientConnected();
247       // NOTE: There's a race here which should be ok, since it doesn't matter which client gets told there are
248
// no more connections left...
249
boolean isMaxConnectionsExceeded = connectionPolicy.maxConnectionsExceeded();
250       if (isError) {
251         synAck = handshakeMessageFactory.createSynAck(connectionId, errorContext, source, isMaxConnectionsExceeded,
252                                                       maxConnections);
253       } else {
254         synAck = handshakeMessageFactory.createSynAck(connectionId, source, isMaxConnectionsExceeded, maxConnections);
255       }
256       sendMessage(synAck);
257     }
258
259     private void sendMessage(WireProtocolMessage message) {
260       transport.sendToConnection(message);
261     }
262   }
263
264 }
265
Popular Tags