1 4 package com.tc.net.protocol.transport; 5 6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 7 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef; 8 9 import com.tc.bytes.TCByteBuffer; 10 import com.tc.logging.TCLogger; 11 import com.tc.net.MaxConnectionsExceededException; 12 import com.tc.net.TCSocketAddress; 13 import com.tc.net.core.TCConnection; 14 import com.tc.net.core.event.TCConnectionErrorEvent; 15 import com.tc.net.core.event.TCConnectionEvent; 16 import com.tc.net.core.event.TCConnectionEventListener; 17 import com.tc.net.protocol.NetworkLayer; 18 import com.tc.net.protocol.NetworkStackID; 19 import com.tc.net.protocol.TCNetworkMessage; 20 import com.tc.util.Assert; 21 import com.tc.util.TCTimeoutException; 22 23 import java.io.IOException ; 24 import java.util.ArrayList ; 25 import java.util.HashSet ; 26 import java.util.Iterator ; 27 import java.util.LinkedList ; 28 import java.util.List ; 29 import java.util.Set ; 30 31 34 abstract class MessageTransportBase implements NetworkLayer, TCConnectionEventListener, MessageTransport { 35 private static final int DISCONNECTED = 1; 36 private static final int CONNECTED = 2; 37 private static final int CONNECT_ATTEMPT = 3; 38 private static final int CLOSED = 4; 39 40 private TCConnection connection; 41 42 protected ConnectionID connectionId = ConnectionID.NULL_ID; 43 protected final MessageTransportStatus status; 44 protected final SynchronizedBoolean isOpen; 45 protected final TransportHandshakeMessageFactory messageFactory; 46 private final List listeners = new LinkedList (); 48 private final TCLogger logger; 49 private final TransportHandshakeErrorHandler handshakeErrorHandler; 50 private NetworkLayer receiveLayer; 51 52 private final Object attachingNewConnection = new Object (); 53 private final SynchronizedRef connectionCloseEvent = new SynchronizedRef(null); 54 private byte[] sourceAddress; 55 private int sourcePort; 56 private byte[] destinationAddress; 57 private int destinationPort; 58 59 protected MessageTransportBase(MessageTransportState initialState, 60 TransportHandshakeErrorHandler handshakeErrorHandler, 61 TransportHandshakeMessageFactory messageFactory, boolean isOpen, TCLogger logger) { 62 63 this.handshakeErrorHandler = handshakeErrorHandler; 64 this.messageFactory = messageFactory; 65 this.isOpen = new SynchronizedBoolean(isOpen); 66 this.logger = logger; 67 this.status = new MessageTransportStatus(initialState, logger); 68 } 69 70 public final void addTransportListeners(List toAdd) { 71 synchronized (listeners) { 72 basicAddTransportListeners(toAdd); 73 } 74 } 75 76 protected List getTransportListeners() { 77 return new ArrayList (listeners); 78 } 79 80 public final void addTransportListener(MessageTransportListener listener) { 81 synchronized (listeners) { 82 List toAdd = new ArrayList (1); 83 toAdd.add(listener); 84 basicAddTransportListeners(toAdd); 85 } 86 } 87 88 private void basicAddTransportListeners(List toAdd) { 89 Set intersection = new HashSet (toAdd); 90 intersection.retainAll(listeners); 91 if (!intersection.isEmpty()) throw new AssertionError ("Attempt to add the same listeners more than once: " 92 + intersection); 93 this.listeners.addAll(toAdd); 94 } 95 96 public final void removeTransportListeners() { 97 synchronized (listeners) { 98 this.listeners.clear(); 99 } 100 } 101 102 public final ConnectionID getConnectionId() { 103 return this.connectionId; 104 } 105 106 public final void setReceiveLayer(NetworkLayer layer) { 107 this.receiveLayer = layer; 108 } 109 110 public final void setSendLayer(NetworkLayer layer) { 111 throw new UnsupportedOperationException ("Transport layer has no send layer."); 112 } 113 114 public final void receiveTransportMessage(WireProtocolMessage message) { 115 synchronized (attachingNewConnection) { 116 if (message.getSource() == this.connection) { 117 receiveTransportMessageImpl(message); 118 } else { 119 logger.warn("Received message from an old connection: " + message); 120 } 121 } 122 } 123 124 public abstract NetworkStackID open() throws MaxConnectionsExceededException, TCTimeoutException, IOException ; 125 126 protected abstract void receiveTransportMessageImpl(WireProtocolMessage message); 127 128 protected final void receiveToReceiveLayer(WireProtocolMessage message) { 129 Assert.assertNotNull(receiveLayer); 130 Assert.eval(!(message instanceof TransportHandshakeMessage)); 131 132 if (message.getWireProtocolHeader().getProtocol() == WireProtocolHeader.PROTOCOL_TRANSPORT_HANDSHAKE) { 133 this.handleHandshakeError(new TransportHandshakeErrorContext("Received inappropriate handshake message!")); 134 } 135 136 this.receiveLayer.receive(message.getPayload()); 137 message.getWireProtocolHeader().recycle(); 138 } 139 140 public final void receive(TCByteBuffer[] msgData) { 141 throw new UnsupportedOperationException (); 142 } 143 144 147 public final void close() { 148 synchronized (isOpen) { 149 Assert.eval("Can only close an open connection", isOpen.get()); 150 isOpen.set(false); 151 fireTransportClosedEvent(); 152 } 153 154 synchronized (status) { 155 if (connection != null && !this.connection.isClosed()) { 156 this.connection.asynchClose(); 157 } 158 } 159 } 160 161 public final void send(TCNetworkMessage message) { 162 167 synchronized (status) { 168 if (!status.isEstablished()) { 169 logger.warn("Ignoring message sent to non-established transport: " + message); 170 return; 171 } 172 173 sendToConnection(message); 174 } 175 } 176 177 public final void sendToConnection(TCNetworkMessage message) { 178 if (message == null) throw new AssertionError ("Attempt to send a null message."); 179 if (!(message instanceof WireProtocolMessage)) { 180 final TCNetworkMessage payload = message; 181 182 message = WireProtocolMessageImpl.wrapMessage(message, connection); 183 Assert.eval(message.getSentCallback() == null); 184 185 final Runnable callback = payload.getSentCallback(); 186 if (callback != null) { 187 message.setSentCallback(new Runnable () { 188 public void run() { 189 callback.run(); 190 } 191 }); 192 } 193 } 194 195 WireProtocolHeader hdr = (WireProtocolHeader) message.getHeader(); 196 197 hdr.setSourceAddress(getSourceAddress()); 198 hdr.setSourcePort(getSourcePort()); 199 hdr.setDestinationAddress(getDestinationAddress()); 200 hdr.setDestinationPort(getDestinationPort()); 201 hdr.computeChecksum(); 202 203 connection.putMessage(message); 204 } 205 206 209 public final boolean isConnected() { 210 synchronized (status) { 211 return this.status.isEstablished(); 212 } 213 } 214 215 public final void attachNewConnection(TCConnection newConnection) { 216 synchronized (attachingNewConnection) { 217 getConnectionAttacher().attachNewConnection((TCConnectionEvent) this.connectionCloseEvent.get(), this.connection, 218 newConnection); 219 } 220 } 221 222 protected ConnectionAttacher getConnectionAttacher() { 223 return new DefaultConnectionAttacher(this); 224 } 225 226 protected interface ConnectionAttacher { 227 public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection); 228 } 229 230 private static final class DefaultConnectionAttacher implements ConnectionAttacher { 231 232 private final MessageTransportBase transport; 233 234 private DefaultConnectionAttacher(MessageTransportBase transport) { 235 this.transport = transport; 236 } 237 238 public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) { 239 Assert.assertNotNull(oldConnection); 240 if (closeEvent == null || closeEvent.getSource() != oldConnection) { 241 this.transport.fireTransportDisconnectedEvent(); 244 } 245 if (oldConnection != null && oldConnection != transport.connection) { 247 oldConnection.removeListener(transport); 248 } 249 transport.wireNewConnection(newConnection); 251 } 252 } 253 254 257 258 public void connectEvent(TCConnectionEvent event) { 259 return; 260 } 261 262 public void closeEvent(TCConnectionEvent event) { 263 boolean isSameConnection = false; 264 265 synchronized (attachingNewConnection) { 266 TCConnection src = event.getSource(); 267 isSameConnection = (src == this.connection); 268 if (isSameConnection) { 269 this.connectionCloseEvent.set(event); 270 } 271 } 272 273 if (isSameConnection) { 274 fireTransportDisconnectedEvent(); 275 } 276 } 277 278 public void errorEvent(TCConnectionErrorEvent errorEvent) { 279 return; 280 } 281 282 public void endOfFileEvent(TCConnectionEvent event) { 283 return; 284 } 285 286 protected final void fireTransportConnectAttemptEvent() { 287 fireTransportEvent(CONNECT_ATTEMPT); 288 } 289 290 protected final void fireTransportConnectedEvent() { 291 logFireTransportConnectEvent(); 292 fireTransportEvent(CONNECTED); 293 } 294 295 private void logFireTransportConnectEvent() { 296 if (logger.isDebugEnabled()) { 297 logger.debug("Firing connect event..."); 298 } 299 } 300 301 protected final void fireTransportDisconnectedEvent() { 302 fireTransportEvent(DISCONNECTED); 303 } 304 305 protected final void fireTransportClosedEvent() { 306 fireTransportEvent(CLOSED); 307 } 308 309 private void fireTransportEvent(int type) { 310 synchronized (listeners) { 311 for (Iterator i = listeners.iterator(); i.hasNext();) { 312 MessageTransportListener listener = (MessageTransportListener) i.next(); 313 switch (type) { 314 case DISCONNECTED: 315 listener.notifyTransportDisconnected(this); 316 break; 317 case CONNECTED: 318 listener.notifyTransportConnected(this); 319 break; 320 case CONNECT_ATTEMPT: 321 listener.notifyTransportConnectAttempt(this); 322 break; 323 case CLOSED: 324 listener.notifyTransportClosed(this); 325 break; 326 default: 327 throw new AssertionError ("Unknown transport event: " + type); 328 } 329 } 330 } 331 } 332 333 protected void handleHandshakeError(TransportHandshakeErrorContext e) { 334 this.handshakeErrorHandler.handleHandshakeError(e); 335 } 336 337 protected void handleHandshakeError(TransportHandshakeErrorContext e, TransportHandshakeMessage m) { 338 this.handshakeErrorHandler.handleHandshakeError(e, m); 339 } 340 341 protected TCConnection getConnection() { 342 return connection; 343 } 344 345 public TCSocketAddress getRemoteAddress() { 346 return this.connection.getRemoteAddress(); 347 } 348 349 public TCSocketAddress getLocalAddress() { 350 return this.connection.getLocalAddress(); 351 } 352 353 protected void setConnection(TCConnection conn) { 354 TCConnection old = this.connection; 355 this.connection = conn; 356 clearAddressCache(); 357 this.connection.addListener(this); 358 if (old != null) { 359 old.removeListener(this); 360 } 361 } 362 363 protected void clearConnection() { 364 getConnection().close(10000); 365 this.connectionId = ConnectionID.NULL_ID; 366 this.connection.removeListener(this); 367 clearAddressCache(); 368 this.connection = null; 369 } 370 371 private void clearAddressCache() { 372 this.sourceAddress = null; 373 this.sourcePort = -1; 374 this.destinationAddress = null; 375 this.destinationPort = -1; 376 } 377 378 private byte[] getSourceAddress() { 379 if (sourceAddress == null) { return sourceAddress = connection.getLocalAddress().getAddressBytes(); } 380 return sourceAddress; 381 } 382 383 private byte[] getDestinationAddress() { 384 if (destinationAddress == null) { return destinationAddress = connection.getRemoteAddress().getAddressBytes(); } 385 return destinationAddress; 386 } 387 388 private int getSourcePort() { 389 if (sourcePort == -1) { return this.sourcePort = connection.getLocalAddress().getPort(); } 390 return sourcePort; 391 } 392 393 private int getDestinationPort() { 394 if (destinationPort == -1) { return this.destinationPort = connection.getRemoteAddress().getPort(); } 395 return sourcePort; 396 } 397 398 protected void wireNewConnection(TCConnection conn) { 399 logger.info("Attaching new connection: " + conn); 400 setConnection(conn); 401 this.status.reset(); 402 } 403 } | Popular Tags |