1 4 package com.tc.net.protocol.transport; 5 6 import com.tc.logging.TCLogger; 7 import com.tc.logging.TCLogging; 8 import com.tc.net.core.TCConnection; 9 import com.tc.net.core.event.TCConnectionEvent; 10 import com.tc.net.protocol.NetworkStackID; 11 import com.tc.util.Assert; 12 13 public class ServerMessageTransport extends MessageTransportBase { 14 15 private static final TCLogger logger = TCLogging.getLogger(ServerMessageTransport.class); 16 17 public ServerMessageTransport(ConnectionID connectionID, TransportHandshakeErrorHandler handshakeErrorHandler, 18 TransportHandshakeMessageFactory messageFactory) { 19 super(MessageTransportState.STATE_RESTART, handshakeErrorHandler, messageFactory, true, logger); 20 this.connectionId = connectionID; 21 } 22 23 28 public ServerMessageTransport(ConnectionID connectionId, TCConnection conn, 29 TransportHandshakeErrorHandler handshakeErrorHandler, 30 TransportHandshakeMessageFactory messageFactory) { 31 super(MessageTransportState.STATE_START, handshakeErrorHandler, messageFactory, true, logger); 32 this.connectionId = connectionId; 33 Assert.assertNotNull(conn); 34 wireNewConnection(conn); 35 } 36 37 protected ConnectionAttacher getConnectionAttacher() { 38 if (this.status.isRestart()) { 39 return new RestartConnectionAttacher(); 40 } else return super.getConnectionAttacher(); 41 } 42 43 public NetworkStackID open() { 44 throw new UnsupportedOperationException ("Server transport doesn't support open()"); 45 } 46 47 protected void receiveTransportMessageImpl(WireProtocolMessage message) { 48 synchronized (status) { 49 if (status.isStart()) { 50 verifyAndHandleAck(message); 51 message.recycle(); 52 return; 53 } else if (verifyHandshakeMessage(message)) { 54 handleHandshakeError(new TransportHandshakeErrorContext("Unexpected handshake message in state: " + status), 55 (TransportHandshakeMessage) message); 56 return; 57 } 58 } 59 super.receiveToReceiveLayer(message); 60 } 61 62 private void verifyAndHandleAck(WireProtocolMessage message) { 63 if (!verifyAck(message)) { 64 handleHandshakeError(new TransportHandshakeErrorContext("Expected an ACK message but received: " + message)); 65 } else { 66 handleAck((TransportHandshakeMessage) message); 67 } 68 } 69 70 private void handleAck(TransportHandshakeMessage ack) { 71 synchronized (status) { 72 Assert.eval(status.isStart()); 73 Assert.eval("Wrong connection ID: [" + this.connectionId + "] != [" + ack.getConnectionId() + "]", 74 this.connectionId.equals(ack.getConnectionId())); 75 status.established(); 76 fireTransportConnectedEvent(); 77 } 78 } 79 80 private boolean verifyAck(WireProtocolMessage message) { 81 return message instanceof TransportHandshakeMessage && ((TransportHandshakeMessage)message).isAck(); 82 } 83 84 private boolean verifyHandshakeMessage(WireProtocolMessage message) { 85 return message instanceof TransportHandshakeMessage; 86 } 87 88 private final class RestartConnectionAttacher implements ConnectionAttacher { 89 90 public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) { 91 Assert.assertNull(oldConnection); 92 wireNewConnection(newConnection); 93 } 94 95 } 96 97 } | Popular Tags |