KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > Digest


1 // $Id: Digest.java,v 1.8 2004/10/08 12:09:10 belaban Exp $
2

3 package org.jgroups.protocols.pbcast;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.Address;
8 import org.jgroups.util.Streamable;
9 import org.jgroups.util.Util;
10
11 import java.io.*;
12
13
14 /**
15  * A message digest, which is used e.g. by the PBCAST layer for gossiping (also used by NAKACK for
16  * keeping track of current seqnos for all members). It contains pairs of senders and a range of seqnos
17  * (low and high), where each sender is associated with its highest and lowest seqnos seen so far. That
18  * is, the lowest seqno which was not yet garbage-collected and the highest that was seen so far and is
19  * deliverable (or was already delivered) to the application. A range of [0 - 0] means no messages have
20  * been received yet. <p> April 3 2001 (bela): Added high_seqnos_seen member. It is used to disseminate
21  * information about the last (highest) message M received from a sender P. Since we might be using a
22  * negative acknowledgment message numbering scheme, we would never know if the last message was
23  * lost. Therefore we periodically gossip and include the last message seqno. Members who haven't seen
24  * it (e.g. because msg was dropped) will request a retransmission. See DESIGN for details.
25  * @author Bela Ban
26  */

27 public class Digest implements Externalizable, Streamable {
28     Address[] senders=null;
29     long[] low_seqnos=null; // lowest seqnos seen
30
long[] high_seqnos=null; // highest seqnos seen so far *that are deliverable*, initially 0
31
long[] high_seqnos_seen=null; // highest seqnos seen so far (not necessarily deliverable), initially -1
32
int index=0; // current index of where next member is added
33
protected static final Log log=LogFactory.getLog(Digest.class);
34
35
36     public Digest() {
37     } // used for externalization
38

39     public Digest(int size) {
40         reset(size);
41     }
42
43
44     public boolean equals(Object JavaDoc obj) {
45         if(obj == null)
46             return false;
47         Digest other=(Digest)obj;
48         if(sameSenders(other) == false)
49             return false;
50
51         if(!Util.match(low_seqnos, other.low_seqnos))
52             return false;
53         if(!Util.match(high_seqnos, other.high_seqnos))
54             return false;
55         if(!Util.match(high_seqnos_seen, other.high_seqnos_seen))
56             return false;
57
58         return true;
59     }
60
61
62
63
64     public void add(Address sender, long low_seqno, long high_seqno) {
65         if(index >= senders.length) {
66             if(log.isErrorEnabled()) log.error("index " + index +
67                     " out of bounds, please create new Digest if you want more members !");
68             return;
69         }
70         if(sender == null) {
71             if(log.isErrorEnabled()) log.error("sender is null, will not add it !");
72             return;
73         }
74         senders[index]=sender;
75         low_seqnos[index]=low_seqno;
76         high_seqnos[index]=high_seqno;
77         high_seqnos_seen[index]=-1;
78         index++;
79     }
80
81
82     public void add(Address sender, long low_seqno, long high_seqno, long high_seqno_seen) {
83         if(index >= senders.length) {
84             if(log.isErrorEnabled()) log.error("index " + index +
85                     " out of bounds, please create new Digest if you want more members !");
86             return;
87         }
88         if(sender == null) {
89             if(log.isErrorEnabled()) log.error("sender is null, will not add it !");
90             return;
91         }
92         senders[index]=sender;
93         low_seqnos[index]=low_seqno;
94         high_seqnos[index]=high_seqno;
95         high_seqnos_seen[index]=high_seqno_seen;
96         index++;
97     }
98
99
100     public void add(Digest d) {
101         Address sender;
102         long low_seqno, high_seqno, high_seqno_seen;
103
104         if(d != null) {
105             for(int i=0; i < d.size(); i++) {
106                 sender=d.senderAt(i);
107                 low_seqno=d.lowSeqnoAt(i);
108                 high_seqno=d.highSeqnoAt(i);
109                 high_seqno_seen=d.highSeqnoSeenAt(i);
110                 add(sender, low_seqno, high_seqno, high_seqno_seen);
111             }
112         }
113     }
114
115
116     /**
117      * Adds a digest to this digest. This digest must have enough space to add the other digest; otherwise an error
118      * message will be written. For each sender in the other digest, the merge() method will be called.
119      */

120     public void merge(Digest d) {
121         Address sender;
122         long low_seqno, high_seqno, high_seqno_seen;
123
124         if(d == null) {
125             if(log.isErrorEnabled()) log.error("digest to be merged with is null");
126             return;
127         }
128         for(int i=0; i < d.size(); i++) {
129             sender=d.senderAt(i);
130             low_seqno=d.lowSeqnoAt(i);
131             high_seqno=d.highSeqnoAt(i);
132             high_seqno_seen=d.highSeqnoSeenAt(i);
133             merge(sender, low_seqno, high_seqno, high_seqno_seen);
134         }
135     }
136
137
138     /**
139      * Similar to add(), but if the sender already exists, its seqnos will be modified (no new entry) as follows:
140      * <ol>
141      * <li>this.low_seqno=min(this.low_seqno, low_seqno)
142      * <li>this.high_seqno=max(this.high_seqno, high_seqno)
143      * <li>this.high_seqno_seen=max(this.high_seqno_seen, high_seqno_seen)
144      * </ol>
145      * If the sender doesn not exist, a new entry will be added (provided there is enough space)
146      */

147     public void merge(Address sender, long low_seqno, long high_seqno, long high_seqno_seen) {
148         int index;
149         long my_low_seqno, my_high_seqno, my_high_seqno_seen;
150         if(sender == null) {
151             if(log.isErrorEnabled()) log.error("sender == null");
152             return;
153         }
154         index=getIndex(sender);
155         if(index == -1) {
156             add(sender, low_seqno, high_seqno, high_seqno_seen);
157             return;
158         }
159
160         my_low_seqno=lowSeqnoAt(index);
161         my_high_seqno=highSeqnoAt(index);
162         my_high_seqno_seen=highSeqnoSeenAt(index);
163         if(low_seqno < my_low_seqno)
164             setLowSeqnoAt(index, low_seqno);
165         if(high_seqno > my_high_seqno)
166             setHighSeqnoAt(index, high_seqno);
167         if(high_seqno_seen > my_high_seqno_seen)
168             setHighSeqnoSeenAt(index, high_seqno_seen);
169     }
170
171
172     public int getIndex(Address sender) {
173         int ret=-1;
174
175         if(sender == null) return ret;
176         for(int i=0; i < senders.length; i++)
177             if(sender.equals(senders[i]))
178                 return i;
179         return ret;
180     }
181
182
183     public boolean contains(Address sender) {
184         return getIndex(sender) != -1;
185     }
186
187
188     /**
189      * Compares two digests and returns true if the senders are the same, otherwise false
190      * @param other
191      * @return
192      */

193     public boolean sameSenders(Digest other) {
194         Address a1, a2;
195         if(other == null) return false;
196         if(this.senders == null || other.senders == null) return false;
197         if(this.senders.length != other.senders.length) return false;
198         for(int i=0; i < this.senders.length; i++) {
199             a1=this.senders[i];
200             a2=other.senders[i];
201             if(a1 == null && a2 == null) continue;
202             if(a1 != null && a2 != null && a1.equals(a2))
203                 continue;
204             else
205                 return false;
206         }
207         return true;
208     }
209
210     /** Increment the sender's high_seqno by 1 */
211     public void incrementHighSeqno(Address sender) {
212         if(sender == null) return;
213         for(int i=0; i < senders.length; i++) {
214             if(senders[i] != null && senders[i].equals(sender)) {
215                 high_seqnos[i]=high_seqnos[i] + 1;
216                 break;
217             }
218         }
219     }
220
221
222     public int size() {
223         return senders.length;
224     }
225
226
227     public Address senderAt(int index) {
228         if(index < size())
229             return senders[index];
230         else {
231             if(log.isErrorEnabled()) log.error("index " + index + " is out of bounds");
232             return null;
233         }
234     }
235
236
237     /**
238      * Resets the seqnos for the sender at 'index' to 0. This happens when a member has left the group,
239      * but it is still in the digest. Resetting its seqnos ensures that no-one will request a message
240      * retransmission from the dead member.
241      */

242     public void resetAt(int index) {
243         if(index < size()) {
244             low_seqnos[index]=0;
245             high_seqnos[index]=0;
246             high_seqnos_seen[index]=-1;
247         }
248         else
249             if(log.isErrorEnabled()) log.error("index " + index + " is out of bounds");
250     }
251
252
253     public void reset(int size) {
254         senders=new Address[size];
255         low_seqnos=new long[size];
256         high_seqnos=new long[size];
257         high_seqnos_seen=new long[size];
258         for(int i=0; i < size; i++)
259             high_seqnos_seen[i]=-1;
260         index=0;
261     }
262
263
264     public long lowSeqnoAt(int index) {
265         if(index < size())
266             return low_seqnos[index];
267         else {
268             if(log.isErrorEnabled()) log.error("index " + index + " is out of bounds");
269             return 0;
270         }
271     }
272
273
274     public long highSeqnoAt(int index) {
275         if(index < size())
276             return high_seqnos[index];
277         else {
278             if(log.isErrorEnabled()) log.error("index " + index + " is out of bounds");
279             return 0;
280         }
281     }
282
283     public long highSeqnoSeenAt(int index) {
284         if(index < size())
285             return high_seqnos_seen[index];
286         else {
287             if(log.isErrorEnabled()) log.error("index " + index + " is out of bounds");
288             return 0;
289         }
290     }
291
292
293     public long highSeqnoAt(Address sender) {
294         long ret=-1;
295         int index;
296
297         if(sender == null) return ret;
298         index=getIndex(sender);
299         if(index == -1)
300             return ret;
301         else
302             return high_seqnos[index];
303     }
304
305
306     public long highSeqnoSeenAt(Address sender) {
307         long ret=-1;
308         int index;
309
310         if(sender == null) return ret;
311         index=getIndex(sender);
312         if(index == -1)
313             return ret;
314         else
315             return high_seqnos_seen[index];
316     }
317
318     public void setLowSeqnoAt(int index, long low_seqno) {
319         if(index < size()) {
320             low_seqnos[index]=low_seqno;
321         }
322         else
323             if(log.isErrorEnabled()) log.error("index " + index + " is out of bounds");
324     }
325
326
327     public void setHighSeqnoAt(int index, long high_seqno) {
328         if(index < size()) {
329             high_seqnos[index]=high_seqno;
330         }
331         else
332             if(log.isErrorEnabled()) log.error("index " + index + " is out of bounds");
333     }
334
335     public void setHighSeqnoSeenAt(int index, long high_seqno_seen) {
336         if(index < size()) {
337             high_seqnos_seen[index]=high_seqno_seen;
338         }
339         else
340             if(log.isErrorEnabled()) log.error("index " + index + " is out of bounds");
341     }
342
343
344     public void setHighSeqnoAt(Address sender, long high_seqno) {
345         int index=getIndex(sender);
346         if(index < 0)
347             return;
348         else
349             setHighSeqnoAt(index, high_seqno);
350     }
351
352     public void setHighSeqnoSeenAt(Address sender, long high_seqno_seen) {
353         int index=getIndex(sender);
354         if(index < 0)
355             return;
356         else
357             setHighSeqnoSeenAt(index, high_seqno_seen);
358     }
359
360
361     public Digest copy() {
362         Digest ret=new Digest(senders.length);
363
364         // changed due to JDK bug (didn't work under JDK 1.4.{1,2} under Linux, JGroups bug #791718
365
// ret.senders=(Address[])senders.clone();
366
if(senders != null)
367             System.arraycopy(senders, 0, ret.senders, 0, senders.length);
368
369         ret.low_seqnos=(long[])low_seqnos.clone();
370         ret.high_seqnos=(long[])high_seqnos.clone();
371         ret.high_seqnos_seen=(long[])high_seqnos_seen.clone();
372         return ret;
373     }
374
375
376     public String JavaDoc toString() {
377         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
378         boolean first=true;
379         if(senders == null) return "[]";
380         for(int i=0; i < senders.length; i++) {
381             if(!first) {
382                 sb.append(", ");
383             }
384             else {
385                 sb.append('[');
386                 first=false;
387             }
388             sb.append(senders[i]).append(": ").append('[').append(low_seqnos[i]).append(" : ");
389             sb.append(high_seqnos[i]);
390             if(high_seqnos_seen[i] >= 0)
391                 sb.append(" (").append(high_seqnos_seen[i]).append(")]");
392         }
393         sb.append(']');
394         return sb.toString();
395     }
396
397
398     public String JavaDoc printHighSeqnos() {
399         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
400         boolean first=true;
401         for(int i=0; i < senders.length; i++) {
402             if(!first) {
403                 sb.append(", ");
404             }
405             else {
406                 sb.append('[');
407                 first=false;
408             }
409             sb.append(senders[i]);
410             sb.append('#');
411             sb.append(high_seqnos[i]);
412         }
413         sb.append(']');
414         return sb.toString();
415     }
416
417
418     public String JavaDoc printHighSeqnosSeen() {
419         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
420         boolean first=true;
421         for(int i=0; i < senders.length; i++) {
422             if(!first) {
423                 sb.append(", ");
424             }
425             else {
426                 sb.append('[');
427                 first=false;
428             }
429             sb.append(senders[i]);
430             sb.append('#');
431             sb.append(high_seqnos_seen[i]);
432         }
433         sb.append(']');
434         return sb.toString();
435     }
436
437
438     public void writeExternal(ObjectOutput out) throws IOException {
439         out.writeObject(senders);
440
441         if(low_seqnos == null)
442             out.writeInt(0);
443         else {
444             out.writeInt(low_seqnos.length);
445             for(int i=0; i < low_seqnos.length; i++)
446                 out.writeLong(low_seqnos[i]);
447         }
448
449         if(high_seqnos == null)
450             out.writeInt(0);
451         else {
452             out.writeInt(high_seqnos.length);
453             for(int i=0; i < high_seqnos.length; i++)
454                 out.writeLong(high_seqnos[i]);
455         }
456
457         if(high_seqnos_seen == null)
458             out.writeInt(0);
459         else {
460             out.writeInt(high_seqnos_seen.length);
461             for(int i=0; i < high_seqnos_seen.length; i++)
462                 out.writeLong(high_seqnos_seen[i]);
463         }
464
465         out.writeInt(index);
466     }
467
468
469     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
470         int num;
471
472         senders=(Address[])in.readObject();
473
474         num=in.readInt();
475         if(num == 0)
476             low_seqnos=null;
477         else {
478             low_seqnos=new long[num];
479             for(int i=0; i < low_seqnos.length; i++)
480                 low_seqnos[i]=in.readLong();
481         }
482
483
484         num=in.readInt();
485         if(num == 0)
486             high_seqnos=null;
487         else {
488             high_seqnos=new long[num];
489             for(int i=0; i < high_seqnos.length; i++)
490                 high_seqnos[i]=in.readLong();
491         }
492
493         num=in.readInt();
494         if(num == 0)
495             high_seqnos_seen=null;
496         else {
497             high_seqnos_seen=new long[num];
498             for(int i=0; i < high_seqnos_seen.length; i++)
499                 high_seqnos_seen[i]=in.readLong();
500         }
501
502         index=in.readInt();
503     }
504
505     public void writeTo(DataOutputStream out) throws IOException {
506         out.writeInt(senders == null? 0 : senders.length);
507         for(int i=0; i < senders.length; i++) {
508             Address sender=senders[i];
509             Util.writeAddress(sender, out);
510         }
511         writeArray(low_seqnos, out);
512         writeArray(high_seqnos, out);
513         writeArray(high_seqnos_seen, out);
514         out.writeInt(index);
515     }
516
517     private void writeArray(long[] arr, DataOutputStream out) throws IOException {
518         int len=arr != null? arr.length : 0;
519         out.writeInt(len);
520         if(len > 0) {
521             for(int i=0; i < arr.length; i++) {
522                 out.writeLong(arr[i]);
523             }
524         }
525     }
526
527     private long[] readArray(DataInputStream in) throws IOException {
528         int b=in.readInt();
529         if(b == 0)
530             return null;
531         long[] retval=new long[b];
532         for(int i=0; i < b; i++)
533             retval[i]=in.readLong();
534         return retval;
535     }
536
537     public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
538         int b=in.readInt();
539         if(b > 0) {
540             senders=new Address[b];
541             Address sender;
542             for(int i=0; i < b; i++) {
543                 sender=Util.readAddress(in);
544                 senders[i]=sender;
545             }
546         }
547         low_seqnos=readArray(in);
548         high_seqnos=readArray(in);
549         high_seqnos_seen=readArray(in);
550         index=in.readInt();
551     }
552
553
554
555 }
556
Popular Tags