KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > transport > ClientMessageTransport


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.net.protocol.transport;
5
6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
7
8 import com.tc.config.schema.dynamic.FixedValueConfigItem;
9 import com.tc.exception.ImplementMe;
10 import com.tc.exception.TCInternalError;
11 import com.tc.exception.TCRuntimeException;
12 import com.tc.logging.TCLogger;
13 import com.tc.logging.TCLogging;
14 import com.tc.net.MaxConnectionsExceededException;
15 import com.tc.net.core.ConfigBasedConnectionAddressProvider;
16 import com.tc.net.core.ConnectionAddressProvider;
17 import com.tc.net.core.ConnectionInfo;
18 import com.tc.net.core.TCConnection;
19 import com.tc.net.core.TCConnectionManager;
20 import com.tc.net.core.event.TCConnectionEvent;
21 import com.tc.net.protocol.NetworkStackID;
22 import com.tc.net.protocol.TCNetworkMessage;
23 import com.tc.net.protocol.TCProtocolAdaptor;
24 import com.tc.util.Assert;
25 import com.tc.util.TCTimeoutException;
26 import com.tc.util.concurrent.TCExceptionResultException;
27 import com.tc.util.concurrent.TCFuture;
28
29 import java.io.IOException JavaDoc;
30 import java.util.List JavaDoc;
31
32 /**
33  * Client implementation of the transport network layer.
34  */

35 public class ClientMessageTransport extends MessageTransportBase {
36   private static final TCLogger logger = TCLogging.getLogger(ClientMessageTransport.class);
37   private static final long SYN_ACK_TIMEOUT = 120000; // 2 minutes timeout
38
private final int maxReconnectTries;
39   private final ClientConnectionEstablisher connectionEstablisher;
40   private boolean wasOpened = false;
41   private TCFuture waitForSynAckResult;
42   private final ConnectionAddressProvider connAddressProvider;
43   private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory;
44   private final SynchronizedBoolean isOpening = new SynchronizedBoolean(false);
45
46   /**
47    * Constructor for when you want a transport that isn't connected yet (e.g., in a client). This constructor will
48    * create an unopened MessageTransport.
49    *
50    * @param commsManager CommmunicationsManager
51    */

52
53   public ClientMessageTransport(int maxReconnectTries, ConnectionInfo connInfo, int timeout,
54                                 TCConnectionManager connManager, TransportHandshakeErrorHandler handshakeErrorHandler,
55                                 TransportHandshakeMessageFactory messageFactory,
56                                 WireProtocolAdaptorFactory wireProtocolAdaptorFactory) {
57     // FIXME 2005-12-08 andrew -- This (usage of a ConfigBasedConnectionAddressProvider with a fixed value here) seems
58
// like a big hack. However, because it's not clear to me exactly what the semantics of the object passed in here
59
// should be, this is the safest thing for me to do right now.
60
this(maxReconnectTries,
61          new ConfigBasedConnectionAddressProvider(new FixedValueConfigItem(new ConnectionInfo[] { connInfo })),
62          timeout, connManager, handshakeErrorHandler, messageFactory, wireProtocolAdaptorFactory);
63   }
64
65   /**
66    * Constructor for when you want a transport that isn't connected yet (e.g., in a client). This constructor will
67    * create an unopened MessageTransport.
68    *
69    * @param commsManager CommmunicationsManager
70    */

71   public ClientMessageTransport(int maxReconnectTries, ConnectionAddressProvider connInfoProvider, int timeout,
72                                 TCConnectionManager connManager, TransportHandshakeErrorHandler handshakeErrorHandler,
73                                 TransportHandshakeMessageFactory messageFactory,
74                                 WireProtocolAdaptorFactory wireProtocolAdaptorFactory) {
75
76     super(MessageTransportState.STATE_START, handshakeErrorHandler, messageFactory, false, logger);
77     this.maxReconnectTries = maxReconnectTries;
78     this.connAddressProvider = connInfoProvider;
79     this.wireProtocolAdaptorFactory = wireProtocolAdaptorFactory;
80
81     this.connectionEstablisher = new ClientConnectionEstablisher(this, connManager, connAddressProvider, logger,
82                                                                  maxReconnectTries, timeout);
83   }
84
85   /**
86    * Blocking open. Causes a connection to be made. Will throw exceptions if the connect fails.
87    *
88    * @throws TCTimeoutException
89    * @throws IOException
90    * @throws TCTimeoutException
91    * @throws MaxConnectionsExceededException
92    */

93   public NetworkStackID open() throws TCTimeoutException, IOException JavaDoc, MaxConnectionsExceededException {
94     // XXX: This extra boolean flag is dumb, but it's here because the close event can show up
95
// while the lock on isOpen is held here. That will cause a deadlock because the close event is thrown on the
96
// comms thread which means that the handshake messages can't be sent.
97
// The state machine here needs to be rationalized.
98
isOpening.set(true);
99     synchronized (isOpen) {
100       Assert.eval("can't open an already open transport", !isOpen.get());
101       try {
102         connectionEstablisher.open();
103         HandshakeResult result = handShake();
104         if (result.isMaxConnectionsExceeded()) {
105           // Hack to make the connection clear
106
// but don't do all the gunk around reconnect
107
// clean this up
108
List JavaDoc tl = this.getTransportListeners();
109           this.removeTransportListeners();
110           clearConnection();
111           this.addTransportListeners(tl);
112           status.reset();
113           throw new MaxConnectionsExceededException("Maximum number of client connections exceeded: "
114                                                     + result.maxConnections());
115         }
116         Assert.eval(!this.connectionId.isNull());
117         isOpen.set(true);
118         wasOpened = true;
119         sendAck();
120         return new NetworkStackID(this.connectionId.getChannelID());
121       } catch (TCTimeoutException e) {
122         status.reset();
123         throw e;
124       } catch (IOException JavaDoc e) {
125         status.reset();
126         throw e;
127       } finally {
128         isOpening.set(false);
129       }
130     }
131   }
132
133   /**
134    * Returns true if the MessageTransport was ever in an open state.
135    */

136   public boolean wasOpened() {
137     return wasOpened;
138   }
139
140   public boolean isOpen() {
141     return !isOpening.get() && !isOpen.get();
142   }
143
144   // TODO :: come back
145
public void closeEvent(TCConnectionEvent event) {
146
147     if (isOpen()) return;
148
149     TCConnection src = event.getSource();
150     Assert.assertSame(getConnection(), src);
151
152     if (!(maxReconnectTries == 0)) {
153       if (logger.isDebugEnabled()) {
154         logger.debug("Caught connection close event: " + event);
155       }
156       status.reset();
157       fireTransportDisconnectedEvent(); // This will make the connection establisher to try and reconnect.
158
} else {
159       super.closeEvent(event);
160
161       synchronized (status) {
162         if (!status.isEnd()) status.end();
163       }
164     }
165   }
166
167   protected void receiveTransportMessageImpl(WireProtocolMessage message) {
168     synchronized (status) {
169       if (status.isSynSent()) {
170         handleSynAck(message);
171         message.recycle();
172         return;
173       }
174     }
175     super.receiveToReceiveLayer(message);
176   }
177
178   private void handleSynAck(WireProtocolMessage message) {
179     if (!verifySynAck(message)) {
180       handleHandshakeError(new TransportHandshakeErrorContext(
181                                                               "Received a message that was not a SYN_ACK while waiting for SYN_ACK: "
182                                                                   + message));
183     } else {
184       SynAckMessage synAck = (SynAckMessage) message;
185       if (synAck.hasErrorContext()) { throw new ImplementMe(synAck.getErrorContext()); }
186
187       if (connectionId != null && !ConnectionID.NULL_ID.equals(connectionId)) {
188         // This is a reconnect
189
Assert.eval(connectionId.equals(synAck.getConnectionId()));
190       }
191       if (!synAck.isMaxConnectionsExceeded()) {
192         this.connectionId = synAck.getConnectionId();
193
194         Assert.assertNotNull("Connection id from the server was null!", this.connectionId);
195         Assert.eval(!ConnectionID.NULL_ID.equals(this.connectionId));
196         Assert.assertNotNull(this.waitForSynAckResult);
197       }
198
199       this.waitForSynAckResult.set(synAck);
200     }
201
202     return;
203   }
204
205   private boolean verifySynAck(TCNetworkMessage message) {
206     // XXX: yuck.
207
return message instanceof TransportHandshakeMessage && ((TransportHandshakeMessage) message).isSynAck();
208   }
209
210   /**
211    * Builds a protocol stack and tries to make a connection. This is a blocking call.
212    *
213    * @throws TCTimeoutException
214    * @throws MaxConnectionsExceededException
215    * @throws IOException
216    */

217   HandshakeResult handShake() throws TCTimeoutException {
218     sendSyn();
219     SynAckMessage synAck = waitForSynAck();
220     return new HandshakeResult(synAck.isMaxConnectionsExceeded(), synAck.getMaxConnections());
221   }
222
223   private SynAckMessage waitForSynAck() throws TCTimeoutException {
224     try {
225       SynAckMessage synAck = (SynAckMessage) waitForSynAckResult.get(SYN_ACK_TIMEOUT);
226       return synAck;
227     } catch (InterruptedException JavaDoc e) {
228       throw new TCRuntimeException(e);
229     } catch (TCExceptionResultException e) {
230       throw new TCInternalError(e);
231     }
232   }
233
234   private void sendSyn() {
235     synchronized (status) {
236       if (status.isEstablished() || status.isSynSent()) { throw new AssertionError JavaDoc(" ERROR !!! " + status); }
237       waitForSynAckResult = new TCFuture(status);
238       TransportHandshakeMessage syn = this.messageFactory.createSyn(this.connectionId, getConnection());
239       // send syn message
240
this.sendToConnection(syn);
241       this.status.synSent();
242     }
243   }
244
245   private void sendAck() {
246     synchronized (status) {
247       Assert.eval(status.isSynSent());
248       TransportHandshakeMessage ack = this.messageFactory.createAck(this.connectionId, getConnection());
249       // send ack message
250
this.sendToConnection(ack);
251       this.status.established();
252       fireTransportConnectedEvent();
253     }
254   }
255
256   void reconnect() throws Exception JavaDoc {
257     Assert.eval(!isConnected());
258     try {
259       HandshakeResult result = handShake();
260       sendAck();
261       if (result.isMaxConnectionsExceeded()) {
262         close();
263         throw new MaxConnectionsExceededException(getMaxConnectionsExceededMessage(result.maxConnections()));
264       }
265     } catch (Exception JavaDoc t) {
266       status.reset();
267       throw t;
268     }
269   }
270
271   private String JavaDoc getMaxConnectionsExceededMessage(int maxConnections) {
272     return "Maximum number of client connections exceeded: " + maxConnections;
273   }
274
275   TCProtocolAdaptor getProtocolAdapter() {
276     return wireProtocolAdaptorFactory.newWireProtocolAdaptor(new WireProtocolMessageSink() {
277       public void putMessage(WireProtocolMessage message) {
278         receiveTransportMessage(message);
279       }
280     });
281   }
282
283   void endIfDisconnected() {
284     synchronized (this.status) {
285       if (!this.isConnected()) {
286         if (!this.status.isEnd()) {
287           this.status.end();
288         }
289       }
290     }
291
292   }
293
294   private static final class HandshakeResult {
295     private final boolean maxConnectionsExceeded;
296     private final int maxConnections;
297
298     private HandshakeResult(boolean maxConnectionsExceeded, int maxConnections) {
299       this.maxConnectionsExceeded = maxConnectionsExceeded;
300       this.maxConnections = maxConnections;
301     }
302
303     public int maxConnections() {
304       return this.maxConnections;
305     }
306
307     public boolean isMaxConnectionsExceeded() {
308       return this.maxConnectionsExceeded;
309     }
310   }
311
312 }
313
Popular Tags