1 4 package com.tc.net.protocol.delivery; 5 6 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 7 8 import com.tc.async.api.Sink; 9 import com.tc.net.protocol.TCNetworkMessage; 10 11 15 class GuaranteedDeliveryProtocol implements DeliveryProtocol { 16 private final StateMachineRunner send; 17 private final StateMachineRunner receive; 18 private final LinkedQueue sendQueue; 19 20 public GuaranteedDeliveryProtocol(OOOProtocolMessageDelivery delivery, Sink workSink, LinkedQueue sendQueue) { 21 this.send = new StateMachineRunner(new SendStateMachine(delivery, sendQueue), workSink); 22 this.receive = new StateMachineRunner(new ReceiveStateMachine(delivery), workSink); 23 this.sendQueue = sendQueue; 24 } 25 26 public void send(TCNetworkMessage message) { 27 try { 28 sendQueue.put(message); 29 send.addEvent(new OOOProtocolEvent()); 30 } catch (InterruptedException e) { 31 throw new AssertionError (e); 32 } 33 } 34 35 public void receive(OOOProtocolMessage protocolMessage) { 36 if (protocolMessage.isSend() || protocolMessage.isAckRequest()) { 37 receive.addEvent(new OOOProtocolEvent(protocolMessage)); 38 } else { 39 send.addEvent(new OOOProtocolEvent(protocolMessage)); 40 } 41 } 42 43 public void start() { 44 send.start(); 45 receive.start(); 46 } 47 48 public void pause() { 49 send.pause(); 50 receive.pause(); 51 } 52 53 public void resume() { 54 send.resume(); 55 receive.resume(); 56 } 57 } | Popular Tags |