1 4 package com.tc.net.protocol.delivery; 5 6 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 7 8 import com.tc.async.api.Sink; 9 import com.tc.bytes.TCByteBuffer; 10 import com.tc.exception.TCRuntimeException; 11 import com.tc.logging.TCLogger; 12 import com.tc.logging.TCLogging; 13 import com.tc.net.MaxConnectionsExceededException; 14 import com.tc.net.protocol.NetworkLayer; 15 import com.tc.net.protocol.NetworkStackID; 16 import com.tc.net.protocol.TCNetworkMessage; 17 import com.tc.net.protocol.TCProtocolException; 18 import com.tc.net.protocol.transport.MessageTransport; 19 import com.tc.util.Assert; 20 import com.tc.util.TCTimeoutException; 21 22 import java.io.IOException ; 23 import java.net.UnknownHostException ; 24 25 28 public class OnceAndOnlyOnceProtocolNetworkLayerImpl implements OnceAndOnlyOnceProtocolNetworkLayer, 29 OOOProtocolMessageDelivery { 30 private static final TCLogger logger = TCLogging.getLogger(OnceAndOnlyOnceProtocolNetworkLayerImpl.class); 31 private final OOOProtocolMessageFactory messageFactory; 32 private final OOOProtocolMessageParser messageParser; 33 boolean wasConnected = false; 34 private NetworkLayer receiveLayer; 35 private NetworkLayer sendLayer; 36 private GuaranteedDeliveryProtocol delivery; 37 38 public OnceAndOnlyOnceProtocolNetworkLayerImpl(OOOProtocolMessageFactory messageFactory, 39 OOOProtocolMessageParser messageParser, Sink workSink) { 40 this.messageFactory = messageFactory; 41 this.messageParser = messageParser; 42 this.delivery = new GuaranteedDeliveryProtocol(this, workSink, new LinkedQueue()); 43 this.delivery.start(); 44 } 45 46 49 50 public void setSendLayer(NetworkLayer layer) { 51 this.sendLayer = layer; 52 } 53 54 public void setReceiveLayer(NetworkLayer layer) { 55 this.receiveLayer = layer; 56 } 57 58 public void send(TCNetworkMessage message) { 59 delivery.send(message); 60 } 61 62 public void receive(TCByteBuffer[] msgData) { 63 OOOProtocolMessage msg = createProtocolMessage(msgData); 64 delivery.receive(msg); 65 } 66 67 public boolean isConnected() { 68 Assert.assertNotNull(sendLayer); 69 return sendLayer.isConnected(); 70 } 71 72 public NetworkStackID open() throws TCTimeoutException, UnknownHostException , IOException , MaxConnectionsExceededException { 73 Assert.assertNotNull(sendLayer); 74 return sendLayer.open(); 75 } 76 77 public void close() { 78 Assert.assertNotNull(sendLayer); 79 80 83 sendLayer.close(); 84 } 85 86 89 90 public void notifyTransportConnected(MessageTransport transport) { 91 logNotifyTransportConnected(transport); 92 this.delivery.resume(); 93 } 94 95 private void logNotifyTransportConnected(MessageTransport transport) { 96 if (logger.isDebugEnabled()) { 97 logger.debug("notifyTransportConnected(" + transport + ")"); 98 } 99 } 100 101 public void notifyTransportDisconnected(MessageTransport transport) { 102 this.delivery.pause(); 103 } 104 105 public void notifyTransportConnectAttempt(MessageTransport transport) { 106 } 108 109 public void notifyTransportClosed(MessageTransport transport) { 110 } 112 113 116 117 public void sendAckRequest() { 118 sendToSendLayer(this.messageFactory.createNewAckRequestMessage()); 119 } 120 121 public void sendAck(long sequence) { 122 sendToSendLayer(this.messageFactory.createNewAckMessage(sequence)); 123 } 124 125 public void sendMessage(OOOProtocolMessage msg) { 126 sendToSendLayer(msg); 127 } 128 129 public void receiveMessage(OOOProtocolMessage msg) { 130 Assert.assertNotNull("Receive layer is null.", this.receiveLayer); 131 Assert.assertNotNull("Attempt to null msg", msg); 132 Assert.eval(msg.isSend()); 133 134 this.receiveLayer.receive(msg.getPayload()); 135 } 136 137 public OOOProtocolMessage createProtocolMessage(long sequence, final TCNetworkMessage msg) { 138 OOOProtocolMessage rv = messageFactory.createNewSendMessage(sequence, msg); 139 140 final Runnable callback = msg.getSentCallback(); 141 if (callback != null) { 142 rv.setSentCallback(new Runnable () { 143 public void run() { 144 callback.run(); 145 } 146 }); 147 } 148 149 return rv; 150 } 151 152 private void sendToSendLayer(OOOProtocolMessage msg) { 153 this.sendLayer.send(msg); 156 } 157 158 private OOOProtocolMessage createProtocolMessage(TCByteBuffer[] msgData) { 159 try { 160 return messageParser.parseMessage(msgData); 161 } catch (TCProtocolException e) { 162 throw new TCRuntimeException(e); 164 } 165 } 166 } | Popular Tags |