1 4 package com.tc.net.protocol.delivery; 5 6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 7 8 import com.tc.util.Assert; 9 10 13 public class ReceiveStateMachine extends AbstractStateMachine { 14 private final State MESSAGE_WAIT_STATE = new MessageWaitState(); 15 16 private final SynchronizedLong received = new SynchronizedLong(-1); 17 private final OOOProtocolMessageDelivery delivery; 18 19 public ReceiveStateMachine(OOOProtocolMessageDelivery delivery) { 20 this.delivery = delivery; 21 } 22 23 public void execute(OOOProtocolMessage msg) { 24 getCurrentState().execute(msg); 25 } 26 27 protected State initialState() { 28 return MESSAGE_WAIT_STATE; 29 } 30 31 private class MessageWaitState extends AbstractState { 32 33 public void execute(OOOProtocolMessage protocolMessage) { 34 if (protocolMessage.isAckRequest()) { 35 delivery.sendAck(received.get()); 36 return; 37 } 38 39 final long r = protocolMessage.getSent(); 40 final long curRecv = received.get(); 41 Assert.eval(r >= curRecv); 42 if (r == curRecv) { 43 } else { 45 putMessage(protocolMessage); 46 final long next = received.increment(); 47 delivery.sendAck(next); 48 } 49 } 50 } 51 52 private void putMessage(OOOProtocolMessage msg) { 53 this.delivery.receiveMessage(msg); 54 } 55 56 } | Popular Tags |