KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > transport > MessageTransportBase


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.net.protocol.transport;
5
6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
7 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
8
9 import com.tc.bytes.TCByteBuffer;
10 import com.tc.logging.TCLogger;
11 import com.tc.net.MaxConnectionsExceededException;
12 import com.tc.net.TCSocketAddress;
13 import com.tc.net.core.TCConnection;
14 import com.tc.net.core.event.TCConnectionErrorEvent;
15 import com.tc.net.core.event.TCConnectionEvent;
16 import com.tc.net.core.event.TCConnectionEventListener;
17 import com.tc.net.protocol.NetworkLayer;
18 import com.tc.net.protocol.NetworkStackID;
19 import com.tc.net.protocol.TCNetworkMessage;
20 import com.tc.util.Assert;
21 import com.tc.util.TCTimeoutException;
22
23 import java.io.IOException JavaDoc;
24 import java.util.ArrayList JavaDoc;
25 import java.util.HashSet JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.LinkedList JavaDoc;
28 import java.util.List JavaDoc;
29 import java.util.Set JavaDoc;
30
31 /**
32  * Implementation of MessaageTransport
33  */

34 abstract class MessageTransportBase implements NetworkLayer, TCConnectionEventListener, MessageTransport {
35   private static final int DISCONNECTED = 1;
36   private static final int CONNECTED = 2;
37   private static final int CONNECT_ATTEMPT = 3;
38   private static final int CLOSED = 4;
39
40   private TCConnection connection;
41
42   protected ConnectionID connectionId = ConnectionID.NULL_ID;
43   protected final MessageTransportStatus status;
44   protected final SynchronizedBoolean isOpen;
45   protected final TransportHandshakeMessageFactory messageFactory;
46   // private final Set listeners = new HashSet();
47
private final List JavaDoc listeners = new LinkedList JavaDoc();
48   private final TCLogger logger;
49   private final TransportHandshakeErrorHandler handshakeErrorHandler;
50   private NetworkLayer receiveLayer;
51
52   private final Object JavaDoc attachingNewConnection = new Object JavaDoc();
53   private final SynchronizedRef connectionCloseEvent = new SynchronizedRef(null);
54   private byte[] sourceAddress;
55   private int sourcePort;
56   private byte[] destinationAddress;
57   private int destinationPort;
58
59   protected MessageTransportBase(MessageTransportState initialState,
60                                  TransportHandshakeErrorHandler handshakeErrorHandler,
61                                  TransportHandshakeMessageFactory messageFactory, boolean isOpen, TCLogger logger) {
62
63     this.handshakeErrorHandler = handshakeErrorHandler;
64     this.messageFactory = messageFactory;
65     this.isOpen = new SynchronizedBoolean(isOpen);
66     this.logger = logger;
67     this.status = new MessageTransportStatus(initialState, logger);
68   }
69
70   public final void addTransportListeners(List JavaDoc toAdd) {
71     synchronized (listeners) {
72       basicAddTransportListeners(toAdd);
73     }
74   }
75
76   protected List JavaDoc getTransportListeners() {
77     return new ArrayList JavaDoc(listeners);
78   }
79
80   public final void addTransportListener(MessageTransportListener listener) {
81     synchronized (listeners) {
82       List JavaDoc toAdd = new ArrayList JavaDoc(1);
83       toAdd.add(listener);
84       basicAddTransportListeners(toAdd);
85     }
86   }
87
88   private void basicAddTransportListeners(List JavaDoc toAdd) {
89     Set JavaDoc intersection = new HashSet JavaDoc(toAdd);
90     intersection.retainAll(listeners);
91     if (!intersection.isEmpty()) throw new AssertionError JavaDoc("Attempt to add the same listeners more than once: "
92                                                           + intersection);
93     this.listeners.addAll(toAdd);
94   }
95
96   public final void removeTransportListeners() {
97     synchronized (listeners) {
98       this.listeners.clear();
99     }
100   }
101
102   public final ConnectionID getConnectionId() {
103     return this.connectionId;
104   }
105
106   public final void setReceiveLayer(NetworkLayer layer) {
107     this.receiveLayer = layer;
108   }
109
110   public final void setSendLayer(NetworkLayer layer) {
111     throw new UnsupportedOperationException JavaDoc("Transport layer has no send layer.");
112   }
113
114   public final void receiveTransportMessage(WireProtocolMessage message) {
115     synchronized (attachingNewConnection) {
116       if (message.getSource() == this.connection) {
117         receiveTransportMessageImpl(message);
118       } else {
119         logger.warn("Received message from an old connection: " + message);
120       }
121     }
122   }
123
124   public abstract NetworkStackID open() throws MaxConnectionsExceededException, TCTimeoutException, IOException JavaDoc;
125
126   protected abstract void receiveTransportMessageImpl(WireProtocolMessage message);
127
128   protected final void receiveToReceiveLayer(WireProtocolMessage message) {
129     Assert.assertNotNull(receiveLayer);
130     Assert.eval(!(message instanceof TransportHandshakeMessage));
131
132     if (message.getWireProtocolHeader().getProtocol() == WireProtocolHeader.PROTOCOL_TRANSPORT_HANDSHAKE) {
133       this.handleHandshakeError(new TransportHandshakeErrorContext("Received inappropriate handshake message!"));
134     }
135
136     this.receiveLayer.receive(message.getPayload());
137     message.getWireProtocolHeader().recycle();
138   }
139
140   public final void receive(TCByteBuffer[] msgData) {
141     throw new UnsupportedOperationException JavaDoc();
142   }
143
144   /**
145    * Moves the MessageTransport state to closed and closes the underlying connection, if any.
146    */

147   public final void close() {
148     synchronized (isOpen) {
149       Assert.eval("Can only close an open connection", isOpen.get());
150       isOpen.set(false);
151       fireTransportClosedEvent();
152     }
153
154     synchronized (status) {
155       if (connection != null && !this.connection.isClosed()) {
156         this.connection.asynchClose();
157       }
158     }
159   }
160
161   public final void send(TCNetworkMessage message) {
162     // synchronized (isOpen) {
163
// Assert.eval("Can't send on an unopen transport [" +
164
// Thread.currentThread().getName() + "]", isOpen.get());
165
// }
166

167     synchronized (status) {
168       if (!status.isEstablished()) {
169         logger.warn("Ignoring message sent to non-established transport: " + message);
170         return;
171       }
172
173       sendToConnection(message);
174     }
175   }
176
177   public final void sendToConnection(TCNetworkMessage message) {
178     if (message == null) throw new AssertionError JavaDoc("Attempt to send a null message.");
179     if (!(message instanceof WireProtocolMessage)) {
180       final TCNetworkMessage payload = message;
181
182       message = WireProtocolMessageImpl.wrapMessage(message, connection);
183       Assert.eval(message.getSentCallback() == null);
184
185       final Runnable JavaDoc callback = payload.getSentCallback();
186       if (callback != null) {
187         message.setSentCallback(new Runnable JavaDoc() {
188           public void run() {
189             callback.run();
190           }
191         });
192       }
193     }
194
195     WireProtocolHeader hdr = (WireProtocolHeader) message.getHeader();
196
197     hdr.setSourceAddress(getSourceAddress());
198     hdr.setSourcePort(getSourcePort());
199     hdr.setDestinationAddress(getDestinationAddress());
200     hdr.setDestinationPort(getDestinationPort());
201     hdr.computeChecksum();
202
203     connection.putMessage(message);
204   }
205
206   /**
207    * Returns true if the underlying connection is open.
208    */

209   public final boolean isConnected() {
210     synchronized (status) {
211       return this.status.isEstablished();
212     }
213   }
214
215   public final void attachNewConnection(TCConnection newConnection) {
216     synchronized (attachingNewConnection) {
217       getConnectionAttacher().attachNewConnection((TCConnectionEvent) this.connectionCloseEvent.get(), this.connection,
218                                                   newConnection);
219     }
220   }
221
222   protected ConnectionAttacher getConnectionAttacher() {
223     return new DefaultConnectionAttacher(this);
224   }
225
226   protected interface ConnectionAttacher {
227     public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection);
228   }
229
230   private static final class DefaultConnectionAttacher implements ConnectionAttacher {
231
232     private final MessageTransportBase transport;
233
234     private DefaultConnectionAttacher(MessageTransportBase transport) {
235       this.transport = transport;
236     }
237
238     public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) {
239       Assert.assertNotNull(oldConnection);
240       if (closeEvent == null || closeEvent.getSource() != oldConnection) {
241         // We either didn't receive a close event or we received a close event
242
// from a connection that isn't our current connection.
243
this.transport.fireTransportDisconnectedEvent();
244       }
245       // remove the transport as a listener for the old connection
246
if (oldConnection != null && oldConnection != transport.connection) {
247         oldConnection.removeListener(transport);
248       }
249       // set the new connection to the current connection.
250
transport.wireNewConnection(newConnection);
251     }
252   }
253
254   /*********************************************************************************************************************
255    * TCConnection listener interface
256    */

257
258   public void connectEvent(TCConnectionEvent event) {
259     return;
260   }
261
262   public void closeEvent(TCConnectionEvent event) {
263     boolean isSameConnection = false;
264
265     synchronized (attachingNewConnection) {
266       TCConnection src = event.getSource();
267       isSameConnection = (src == this.connection);
268       if (isSameConnection) {
269         this.connectionCloseEvent.set(event);
270       }
271     }
272
273     if (isSameConnection) {
274       fireTransportDisconnectedEvent();
275     }
276   }
277
278   public void errorEvent(TCConnectionErrorEvent errorEvent) {
279     return;
280   }
281
282   public void endOfFileEvent(TCConnectionEvent event) {
283     return;
284   }
285
286   protected final void fireTransportConnectAttemptEvent() {
287     fireTransportEvent(CONNECT_ATTEMPT);
288   }
289
290   protected final void fireTransportConnectedEvent() {
291     logFireTransportConnectEvent();
292     fireTransportEvent(CONNECTED);
293   }
294
295   private void logFireTransportConnectEvent() {
296     if (logger.isDebugEnabled()) {
297       logger.debug("Firing connect event...");
298     }
299   }
300
301   protected final void fireTransportDisconnectedEvent() {
302     fireTransportEvent(DISCONNECTED);
303   }
304
305   protected final void fireTransportClosedEvent() {
306     fireTransportEvent(CLOSED);
307   }
308
309   private void fireTransportEvent(int type) {
310     synchronized (listeners) {
311       for (Iterator JavaDoc i = listeners.iterator(); i.hasNext();) {
312         MessageTransportListener listener = (MessageTransportListener) i.next();
313         switch (type) {
314           case DISCONNECTED:
315             listener.notifyTransportDisconnected(this);
316             break;
317           case CONNECTED:
318             listener.notifyTransportConnected(this);
319             break;
320           case CONNECT_ATTEMPT:
321             listener.notifyTransportConnectAttempt(this);
322             break;
323           case CLOSED:
324             listener.notifyTransportClosed(this);
325             break;
326           default:
327             throw new AssertionError JavaDoc("Unknown transport event: " + type);
328         }
329       }
330     }
331   }
332
333   protected void handleHandshakeError(TransportHandshakeErrorContext e) {
334     this.handshakeErrorHandler.handleHandshakeError(e);
335   }
336
337   protected void handleHandshakeError(TransportHandshakeErrorContext e, TransportHandshakeMessage m) {
338     this.handshakeErrorHandler.handleHandshakeError(e, m);
339   }
340
341   protected TCConnection getConnection() {
342     return connection;
343   }
344
345   public TCSocketAddress getRemoteAddress() {
346     return this.connection.getRemoteAddress();
347   }
348
349   public TCSocketAddress getLocalAddress() {
350     return this.connection.getLocalAddress();
351   }
352
353   protected void setConnection(TCConnection conn) {
354     TCConnection old = this.connection;
355     this.connection = conn;
356     clearAddressCache();
357     this.connection.addListener(this);
358     if (old != null) {
359       old.removeListener(this);
360     }
361   }
362
363   protected void clearConnection() {
364     getConnection().close(10000);
365     this.connectionId = ConnectionID.NULL_ID;
366     this.connection.removeListener(this);
367     clearAddressCache();
368     this.connection = null;
369   }
370
371   private void clearAddressCache() {
372     this.sourceAddress = null;
373     this.sourcePort = -1;
374     this.destinationAddress = null;
375     this.destinationPort = -1;
376   }
377
378   private byte[] getSourceAddress() {
379     if (sourceAddress == null) { return sourceAddress = connection.getLocalAddress().getAddressBytes(); }
380     return sourceAddress;
381   }
382
383   private byte[] getDestinationAddress() {
384     if (destinationAddress == null) { return destinationAddress = connection.getRemoteAddress().getAddressBytes(); }
385     return destinationAddress;
386   }
387
388   private int getSourcePort() {
389     if (sourcePort == -1) { return this.sourcePort = connection.getLocalAddress().getPort(); }
390     return sourcePort;
391   }
392
393   private int getDestinationPort() {
394     if (destinationPort == -1) { return this.destinationPort = connection.getRemoteAddress().getPort(); }
395     return sourcePort;
396   }
397
398   protected void wireNewConnection(TCConnection conn) {
399     logger.info("Attaching new connection: " + conn);
400     setConnection(conn);
401     this.status.reset();
402   }
403 }
Popular Tags