KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > CAUSAL


1 // $Id: CAUSAL.java,v 1.6 2004/07/05 14:17:15 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.Protocol;
7
8 import java.io.IOException JavaDoc;
9 import java.io.ObjectInput JavaDoc;
10 import java.io.ObjectOutput JavaDoc;
11 import java.util.LinkedList JavaDoc;
12 import java.util.ListIterator JavaDoc;
13 import java.util.Vector JavaDoc;
14
15
16
17
18 /** <p>
19  * Implements casual ordering layer using vector clocks
20  * </p>
21  * <p>
22  * Causal protocol layer guarantees that if message m0 multicasted
23  * by a process group member p0 causes process group member
24  * p1 to multicast message p1 then all other remaining process group
25  * members in a current view will receive messages in order m0
26  * followed by m1.
27  * </p>
28  * <p>
29  * First time encountered, causal order seems very similar to FIFO order but
30  * there is an important distinction. While FIFO order gurantees that
31  * if process group member p0 multicasts m0 followed by m1 the messages
32  * will be delivered in order m0,m1 to all other group members, causal
33  * order expands this notion of an order from a single group member "space"
34  * to a whole group space i.e if p0 sends message m0 which causes member
35  * p1 to send message m1 then all other group members are guaranteed to
36  * receive m0 followed by m1.
37  * </p>
38  * <p>
39  * Causal protocol layer achieves this ordering type by introducing sense of
40  * a time in a group using vector clocks. The idea is very simple. Each message
41  * is labeled by a vector, contained in a causal header, representing the number of
42  * prior causal messages received by the sending group member. Vector time of [3,5,2,4] in
43  * a group of four members [p0,p1,p2,p3] means that process p0 has sent 3 messages
44  * and has received 5,2 and 4 messages from a member p1,p2 and p3 respectively.
45  * </p>
46  * <p>
47  * Each member increases its counter by 1 when it sends a message. When receiving
48  * message mi from a member pi , (where pi != pj) containing vector time VT(mi),
49  * process pj delays delivery of a message mi until:
50  * </p>
51  * <p>
52  * for every k:1..n
53  *
54  * VT(mi)[k] == VT(pj)[k] + 1 if k=i,
55  * VT(mi)[k] <= VT(pj)[k] otherwise
56  * </p>
57  * <p>
58  * After the next causal message is delivered at process group pj, VT(pj) is
59  * updated as follows:
60  *</p>
61  *<p>
62  * for every k:1...n VT(pj)[k] == max(VT(mi)[k],VT(pj)[k])
63  *</p>
64  * @author Vladimir Blagojevic vladimir@cs.yorku.ca
65  * @version $Revision: 1.6 $
66  *
67  **/

68
69
70 public class CAUSAL extends Protocol
71 {
72
73    public static class CausalHeader extends Header
74    {
75       /**
76        * vector timestamp of this header/message
77        */

78       private TransportedVectorTime t;
79
80       /**
81        *used for externalization
82        */

83       public CausalHeader()
84       {
85       }
86
87       public CausalHeader(TransportedVectorTime timeVector)
88       {
89          t = timeVector;
90       }
91
92       /**
93        *Returns a vector timestamp carreid by this header
94        *@return Vector timestamp contained in this header
95        */

96       public TransportedVectorTime getVectorTime()
97       {
98          return t;
99       }
100
101       /**
102        * Size of this vector timestamp estimation, used in fragmetation
103        * @return headersize in bytes
104        */

105       public long size()
106       {
107
108          /*why 231, don't know but these are this values I get when
109          flattening the object into byte buffer*/

110          return 231 + (t.size() * 4);
111       }
112
113       /**
114        * Manual serialization
115        *
116        *
117        */

118       public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc
119       {
120          out.writeObject(t);
121       }
122
123       /**
124        * Manual deserialization
125        *
126        */

127       public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc,
128             ClassNotFoundException JavaDoc
129       {
130          t = (TransportedVectorTime) in.readObject();
131       }
132
133       public String JavaDoc toString()
134       {
135          return "[CAUSALHEADER:" + t + ']';
136       }
137    }
138
139
140    /**
141     *Vector time clock belonging to the member that "owns" this stack
142     */

143    private VectorTime localVector;
144
145    /**
146     * dealy queue containg messages waiting for the delivery i.e causal order
147     */

148    private LinkedList JavaDoc delayQueue;
149
150    /**
151     *Address of this group member
152     */

153    private Address localAddress;
154
155
156    /**
157     *default constructor
158     */

159    public CAUSAL()
160    {
161    }
162
163    /**
164     * Adds a vectortimestamp to a sorted queue
165     * @param tvt A vector time stamp
166     */

167    private void addToDelayQueue(TransportedVectorTime tvt)
168    {
169       ListIterator JavaDoc i = delayQueue.listIterator(0);
170       TransportedVectorTime current = null;
171       while (i.hasNext())
172       {
173          current = (TransportedVectorTime) i.next();
174          if (tvt.lessThanOrEqual(current))
175          {
176             delayQueue.add(i.previousIndex(), tvt);
177             return;
178          }
179       }
180       delayQueue.add(tvt);
181    }
182
183    /**
184     * Processes Event going down in the stack
185     * @param evt Event passed from the stack above Causal
186     */

187    public void down(Event evt)
188    {
189       switch (evt.getType())
190       {
191          case Event.MSG:
192             Message msg = (Message) evt.getArg();
193
194             //dont stamp unicasts
195
if (msg.getDest() != null && !msg.getDest().isMulticastAddress())
196                break;
197
198               Message causalMsg=new Message(msg.getDest(), msg.getSrc(), msg);
199               synchronized (this)
200             {
201                localVector.increment();
202                causalMsg.putHeader(getName(), new CausalHeader(localVector.getTransportedVectorTime()));
203             }
204             passDown(new Event(Event.MSG, causalMsg));
205             return;
206       }
207       passDown(evt);
208    }
209
210    /**
211     * Processes Event going up through the stack
212     * @param evt Event passed from the stack below Causal
213     */

214    public void up(Event evt)
215    {
216       switch (evt.getType())
217       {
218          case Event.SET_LOCAL_ADDRESS:
219             localAddress = (Address) evt.getArg();
220             localVector = new VectorTime(localAddress);
221             delayQueue = new LinkedList JavaDoc();
222             break;
223
224          case Event.VIEW_CHANGE:
225             Vector JavaDoc newViewMembers = ((View) evt.getArg()).getMembers();
226             localVector.merge((Vector JavaDoc) newViewMembers.clone());
227             localVector.reset();
228             break;
229
230          case Event.MSG:
231             Object JavaDoc obj = null;
232             Message msg = (Message) evt.getArg();
233
234             if (!((obj = msg.getHeader(getName())) instanceof CausalHeader))
235             {
236                if(log.isErrorEnabled()) log.error("NO CAUSAL.Header found");
237            passUp(evt);
238            return;
239             }
240
241             CausalHeader header = (CausalHeader) obj;
242             TransportedVectorTime messageVector = header.getVectorTime();
243
244             synchronized (this)
245             {
246                if (localVector.isCausallyNext(messageVector))
247                {
248                    Object JavaDoc tmp=msg.getObject();
249                    passUp(new Event(Event.MSG, tmp));
250                   localVector.max(messageVector);
251                }
252                else
253                {
254                   messageVector.setAssociatedMessage(msg);
255                   addToDelayQueue(messageVector);
256                }
257                TransportedVectorTime queuedVector = null;
258                while ((delayQueue.size() > 0) &&
259                      localVector.isCausallyNext((queuedVector = (TransportedVectorTime) delayQueue.getFirst())))
260                {
261                   delayQueue.remove(queuedVector);
262                    Object JavaDoc tmp=queuedVector.getAssociatedMessage().getObject();
263                    passUp(new Event(Event.MSG, tmp));
264                   localVector.max(queuedVector);
265                }
266                return;
267             }
268
269       }
270       passUp(evt);
271    }
272
273    /**
274     * Returns a name of this stack, each stackhas to have unique name
275     * @return stack's name - CAUSAL
276     */

277    public String JavaDoc getName()
278    {
279       return "CAUSAL";
280    }
281
282
283 }
284
Popular Tags