1 4 package com.tc.net.protocol.delivery; 5 6 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 7 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 8 9 import com.tc.net.protocol.TCNetworkMessage; 10 import com.tc.util.Assert; 11 12 15 public class SendStateMachine extends AbstractStateMachine { 16 private final State ACK_REQUEST_STATE = new AckRequestState(); 17 private final State ACK_WAIT_STATE = new AckWaitState(); 18 private final State MESSAGE_WAIT_STATE = new MessageWaitState(); 19 private final SynchronizedLong sent = new SynchronizedLong(-1); 20 private final SynchronizedLong acked = new SynchronizedLong(-1); 21 private final OOOProtocolMessageDelivery delivery; 22 private final LinkedQueue sendQueue; 23 24 public SendStateMachine(OOOProtocolMessageDelivery delivery, LinkedQueue sendQueue) { 25 super(); 26 this.delivery = delivery; 27 this.sendQueue = sendQueue; 28 } 29 30 protected void basicResume() { 31 switchToState(ACK_REQUEST_STATE); 32 } 33 34 protected State initialState() { 35 Assert.eval(MESSAGE_WAIT_STATE != null); 36 return MESSAGE_WAIT_STATE; 37 } 38 39 public void execute(OOOProtocolMessage msg) { 40 Assert.eval(isStarted()); 41 getCurrentState().execute(msg); 42 } 43 44 private class MessageWaitState extends AbstractState { 45 46 public void enter() { 47 execute(null); 48 } 49 50 public void execute(OOOProtocolMessage protocolMessage) { 51 if (!sendQueue.isEmpty()) { 52 Assert.eval(protocolMessage == null); 53 sendMessage(createProtocolMessage(sent.increment())); 54 switchToState(ACK_WAIT_STATE); 55 } 56 } 57 } 58 59 private class AckRequestState extends AbstractState { 60 public void enter() { 61 if (sent.get() > acked.get()) { 62 sendAckRequest(); 63 switchToState(ACK_WAIT_STATE); 64 } else { 65 switchToState(MESSAGE_WAIT_STATE); 66 } 67 68 } 69 } 70 71 private class AckWaitState extends AbstractState { 72 public void execute(OOOProtocolMessage protocolMessage) { 73 if (protocolMessage == null) return; 75 76 if (protocolMessage.isSend()) return; 78 79 if (protocolMessage.getAckSequence() < sent.get()) { 80 sendMessage(createProtocolMessage(sent.get())); 81 } else { 82 acked.set(protocolMessage.getAckSequence()); 83 removeMessage(); 84 switchToState(MESSAGE_WAIT_STATE); 85 86 Assert.eval(acked.get() <= sent.get()); 88 } 89 } 90 } 91 92 private void sendAckRequest() { 93 delivery.sendAckRequest(); 94 } 95 96 private void sendMessage(OOOProtocolMessage protocolMessage) { 97 delivery.sendMessage(protocolMessage); 98 } 99 100 private OOOProtocolMessage createProtocolMessage(long count) { 101 TCNetworkMessage tcm = (TCNetworkMessage) sendQueue.peek(); 102 Assert.eval(tcm != null); 103 return delivery.createProtocolMessage(count, tcm); 104 } 105 106 private void removeMessage() { 107 try { 108 sendQueue.take(); 109 } catch (InterruptedException e) { 110 throw new AssertionError (e); 111 } 112 } 113 } | Popular Tags |