KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > delivery > SendStateMachine


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.net.protocol.delivery;
5
6 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
7 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
8
9 import com.tc.net.protocol.TCNetworkMessage;
10 import com.tc.util.Assert;
11
12 /**
13  *
14  */

15 public class SendStateMachine extends AbstractStateMachine {
16   private final State ACK_REQUEST_STATE = new AckRequestState();
17   private final State ACK_WAIT_STATE = new AckWaitState();
18   private final State MESSAGE_WAIT_STATE = new MessageWaitState();
19   private final SynchronizedLong sent = new SynchronizedLong(-1);
20   private final SynchronizedLong acked = new SynchronizedLong(-1);
21   private final OOOProtocolMessageDelivery delivery;
22   private final LinkedQueue sendQueue;
23
24   public SendStateMachine(OOOProtocolMessageDelivery delivery, LinkedQueue sendQueue) {
25     super();
26     this.delivery = delivery;
27     this.sendQueue = sendQueue;
28   }
29
30   protected void basicResume() {
31     switchToState(ACK_REQUEST_STATE);
32   }
33
34   protected State initialState() {
35     Assert.eval(MESSAGE_WAIT_STATE != null);
36     return MESSAGE_WAIT_STATE;
37   }
38
39   public void execute(OOOProtocolMessage msg) {
40     Assert.eval(isStarted());
41     getCurrentState().execute(msg);
42   }
43
44   private class MessageWaitState extends AbstractState {
45
46     public void enter() {
47       execute(null);
48     }
49
50     public void execute(OOOProtocolMessage protocolMessage) {
51       if (!sendQueue.isEmpty()) {
52         Assert.eval(protocolMessage == null);
53         sendMessage(createProtocolMessage(sent.increment()));
54         switchToState(ACK_WAIT_STATE);
55       }
56     }
57   }
58
59   private class AckRequestState extends AbstractState {
60     public void enter() {
61       if (sent.get() > acked.get()) {
62         sendAckRequest();
63         switchToState(ACK_WAIT_STATE);
64       } else {
65         switchToState(MESSAGE_WAIT_STATE);
66       }
67
68     }
69   }
70
71   private class AckWaitState extends AbstractState {
72     public void execute(OOOProtocolMessage protocolMessage) {
73       // double yuck
74
if (protocolMessage == null) return;
75       
76       //yuck
77
if (protocolMessage.isSend()) return;
78       
79       if (protocolMessage.getAckSequence() < sent.get()) {
80         sendMessage(createProtocolMessage(sent.get()));
81       } else {
82         acked.set(protocolMessage.getAckSequence());
83         removeMessage();
84         switchToState(MESSAGE_WAIT_STATE);
85        
86         // ???: is this check properly synchronized?
87
Assert.eval(acked.get() <= sent.get());
88       }
89     }
90   }
91
92   private void sendAckRequest() {
93     delivery.sendAckRequest();
94   }
95
96   private void sendMessage(OOOProtocolMessage protocolMessage) {
97     delivery.sendMessage(protocolMessage);
98   }
99
100   private OOOProtocolMessage createProtocolMessage(long count) {
101     TCNetworkMessage tcm = (TCNetworkMessage) sendQueue.peek();
102     Assert.eval(tcm != null);
103     return delivery.createProtocolMessage(count, tcm);
104   }
105
106   private void removeMessage() {
107     try {
108       sendQueue.take();
109     } catch (InterruptedException JavaDoc e) {
110       throw new AssertionError JavaDoc(e);
111     }
112   }
113 }
Popular Tags