KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > PBCAST


1 // $Id: PBCAST.java,v 1.10 2004/09/23 16:29:38 belaban Exp $
2

3 package org.jgroups.protocols.pbcast;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Message;
8 import org.jgroups.View;
9 import org.jgroups.stack.NakReceiverWindow;
10 import org.jgroups.stack.Protocol;
11 import org.jgroups.util.List;
12 import org.jgroups.util.Queue;
13 import org.jgroups.util.QueueClosedException;
14 import org.jgroups.util.Util;
15
16 import java.util.Enumeration JavaDoc;
17 import java.util.Hashtable JavaDoc;
18 import java.util.Properties JavaDoc;
19 import java.util.Vector JavaDoc;
20
21
22 /**
23  * Implementation of probabilistic broadcast. Sends group messages via unreliable multicast. Gossips regularly to
24  * a random subset of group members to retransmit missing messages. Gossiping is used both for bringing all
25  * members to the same state (having received the same messages) and to garbage-collect messages seen by all members
26  * (gc is piggybacked in gossip messages). See DESIGN for more details.
27  * @author Bela Ban
28  */

29 public class PBCAST extends Protocol implements Runnable JavaDoc {
30     boolean operational=false;
31     long seqno=1; // seqno for messages. 1 for the first message
32
long gossip_round=1; // identifies the gossip (together with sender)
33
Address local_addr=null;
34     final Hashtable JavaDoc digest=new Hashtable JavaDoc(); // stores all messages from members (key: member, val: NakReceiverWindow)
35
Thread JavaDoc gossip_thread=null;
36     GossipHandler gossip_handler=null; // removes gossips and other requests from queue and handles them
37
final Queue gossip_queue=new Queue(); // (bounded) queue for incoming gossip requests
38
int max_queue=100; // max elements in gossip_queue (bounded buffer)
39
long gossip_interval=5000; // gossip every 5 seconds
40
double subset=0.1; // send gossip messages to a subset consisting of 10% of the mbrship
41
long desired_avg_gossip=30000; // receive a gossip every 30 secs on average
42
final Vector JavaDoc members=new Vector JavaDoc();
43     final List gossip_list=new List(); // list of gossips received, we periodically purge it (FIFO)
44
int max_gossip_cache=100; // number of gossips to keep until gossip list is purged
45
int gc_lag=30; // how many seqnos should we lag behind (see DESIGN)
46
final Hashtable JavaDoc invalid_gossipers=new Hashtable JavaDoc(); // keys=Address, val=Integer (number of gossips from suspected mbrs)
47
final int max_invalid_gossips=2; // max number of gossip from non-member before that member is shunned
48
Vector JavaDoc seen_list=null;
49     boolean shun=false; // whether invalid gossipers will be shunned or not
50
boolean dynamic=true; // whether to use dynamic or static gosssip_interval (overrides gossip_interval)
51
boolean skip_sleep=true;
52     boolean mcast_gossip=true; // use multicast for gossips (subset will be ignored, send to all members)
53

54
55     public String JavaDoc getName() {
56         return "PBCAST";
57     }
58
59
60     public Vector JavaDoc providedUpServices() {
61         Vector JavaDoc retval=new Vector JavaDoc();
62         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST));
63         retval.addElement(new Integer JavaDoc(Event.SET_DIGEST));
64         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST_STATE));
65         return retval;
66     }
67
68
69     public void stop() {
70         stopGossipThread();
71         stopGossipHandler();
72         operational=false;
73     }
74
75
76     public void up(Event evt) {
77         Message m;
78         PbcastHeader hdr;
79         Address sender=null;
80
81         switch(evt.getType()) {
82             case Event.MSG:
83                 m=(Message) evt.getArg();
84                 if(m.getDest() != null && !m.getDest().isMulticastAddress()) {
85                     if(!(m.getHeader(getName()) instanceof PbcastHeader))
86                         break; // unicast address: not null and not mcast, pass up unchanged
87
}
88
89                 // discard all multicast messages until we become operational (transition from joiner to member)
90
if(!operational) {
91
92                         if(log.isInfoEnabled()) log.info("event was discarded as I'm not yet operational. Event: " +
93                                                   Util.printEvent(evt));
94                     return; // don't pass up
95
}
96
97                 if(m.getHeader(getName()) instanceof PbcastHeader)
98                     hdr=(PbcastHeader) m.removeHeader(getName());
99                 else {
100                     sender=m.getSrc();
101
102                         if(log.isErrorEnabled()) log.error("PbcastHeader expected, but received header of type " +
103                                                    m.getHeader(getName()).getClass().getName() + " from " + sender +
104                                                    ". Passing event up unchanged");
105                     break;
106                 }
107
108                 switch(hdr.type) {
109                     case PbcastHeader.MCAST_MSG: // messages are handled directly (high priority)
110
handleUpMessage(m, hdr);
111                         return;
112
113                         // all other requests are put in the bounded gossip queue (discarded if full). this helps to ensure
114
// that no 'gossip storms' will occur (overflowing the buffers and the network)
115
case PbcastHeader.GOSSIP:
116                     case PbcastHeader.XMIT_REQ:
117                     case PbcastHeader.XMIT_RSP:
118                     case PbcastHeader.NOT_MEMBER:
119                         try {
120                             if(gossip_queue.size() >= max_queue) {
121
122                                     if(log.isWarnEnabled()) log.warn("gossip request " +
123                                                               PbcastHeader.type2String(hdr.type) + " discarded because " +
124                                                               "gossip_queue is full (number of elements=" + gossip_queue.size() + ')');
125                                 return;
126                             }
127                             gossip_queue.add(new GossipEntry(hdr, m.getSrc(), m.getBuffer()));
128                         }
129                         catch(Exception JavaDoc ex) {
130                             if(log.isWarnEnabled()) log.warn("exception adding request to gossip_queue, details=" + ex);
131                         }
132                         return;
133
134                     default:
135                         if(log.isErrorEnabled()) log.error("type (" + hdr.type + ") of PbcastHeader not known !");
136                         return;
137                 }
138
139             case Event.SET_LOCAL_ADDRESS:
140                 local_addr=(Address) evt.getArg();
141                 break; // pass up
142
}
143
144         passUp(evt); // pass up by default
145
}
146
147
148     public void down(Event evt) {
149         PbcastHeader hdr;
150         Message m, copy;
151         View v;
152         Vector JavaDoc mbrs;
153         Address key;
154         NakReceiverWindow win;
155
156
157         switch(evt.getType()) {
158
159             case Event.MSG:
160                 m=(Message) evt.getArg();
161                 if(m.getDest() != null && !m.getDest().isMulticastAddress()) {
162                     break; // unicast address: not null and not mcast, pass down unchanged
163
}
164                 else { // multicast address
165
hdr=new PbcastHeader(PbcastHeader.MCAST_MSG, seqno);
166                     m.putHeader(getName(), hdr);
167
168                     // put message in NakReceiverWindow (to be on the safe side if we don't receive it ...)
169
synchronized(digest) {
170                         win=(NakReceiverWindow) digest.get(local_addr);
171                         if(win == null) {
172                             if(log.isInfoEnabled()) log.info("NakReceiverWindow for sender " + local_addr +
173                                                         " not found. Creating new NakReceiverWindow starting at seqno=" + seqno);
174                             win=new NakReceiverWindow(local_addr, seqno);
175                             digest.put(local_addr, win);
176                         }
177                         copy=m.copy();
178                         copy.setSrc(local_addr);
179                         win.add(seqno, copy);
180                     }
181                     seqno++;
182                     break;
183                 }
184
185             case Event.SET_DIGEST:
186                 setDigest((Digest) evt.getArg());
187                 return; // don't pass down
188

189             case Event.GET_DIGEST: // don't pass down
190
passUp(new Event(Event.GET_DIGEST_OK, getDigest()));
191                 return;
192
193             case Event.GET_DIGEST_STATE: // don't pass down
194
passUp(new Event(Event.GET_DIGEST_STATE_OK, getDigest()));
195                 return;
196
197             case Event.VIEW_CHANGE:
198                 v=(View) evt.getArg();
199                 if(v == null) {
200                     if(log.isErrorEnabled()) log.error("view is null !");
201                     break;
202                 }
203                 mbrs=v.getMembers();
204
205                 // update internal membership list
206
synchronized(members) {
207                     members.removeAllElements();
208                     for(int i=0; i < mbrs.size(); i++)
209                         members.addElement(mbrs.elementAt(i));
210                 }
211
212                 // delete all members in digest that are not in new membership list
213
if(mbrs.size() > 0) {
214                     synchronized(digest) {
215                         for(Enumeration JavaDoc e=digest.keys(); e.hasMoreElements();) {
216                             key=(Address) e.nextElement();
217                             if(!mbrs.contains(key)) {
218                                 win=(NakReceiverWindow) digest.get(key);
219                                 win.reset();
220                                 digest.remove(key);
221                             }
222                         }
223                     }
224                 }
225
226                 // add all members from new membership list that are not yet in digest
227
for(int i=0; i < mbrs.size(); i++) {
228                     key=(Address) mbrs.elementAt(i);
229                     if(!digest.containsKey(key)) {
230                         digest.put(key, new NakReceiverWindow(key, 1));
231                     }
232                 }
233
234                 if(dynamic) {
235                     gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip);
236
237                         if(log.isInfoEnabled()) log.info("VIEW_CHANGE: gossip_interval=" + gossip_interval);
238                     if(gossip_thread != null) {
239                         skip_sleep=true;
240                         gossip_thread.interrupt(); // wake up and sleep according to the new gossip_interval
241
}
242                 }
243
244                 startGossipThread(); // will only be started if not yet running
245
startGossipHandler();
246                 break;
247
248             case Event.BECOME_SERVER:
249                 operational=true;
250                 break;
251         }
252
253         passDown(evt);
254     }
255
256
257     /** Gossip thread. Sends gossips containing a message digest every <code>gossip_interval</code> msecs */
258     public void run() {
259         while(gossip_thread != null) { // stopGossipThread() sets gossip_thread to null
260
if(dynamic) {
261                 gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip);
262
263                     if(log.isInfoEnabled()) log.info("gossip_interval=" + gossip_interval);
264             }
265
266             Util.sleep(gossip_interval);
267             if(skip_sleep)
268                 skip_sleep=false;
269             else
270                 sendGossip();
271         }
272     }
273
274
275     /** Setup the Protocol instance acording to the configuration string */
276     public boolean setProperties(Properties JavaDoc props) {super.setProperties(props);
277         String JavaDoc str;
278
279         str=props.getProperty("dynamic");
280         if(str != null) {
281             dynamic=Boolean.valueOf(str).booleanValue();
282             props.remove("dynamic");
283         }
284
285         str=props.getProperty("shun");
286         if(str != null) {
287             shun=Boolean.valueOf(str).booleanValue();
288             props.remove("shun");
289         }
290
291         str=props.getProperty("gossip_interval");
292         if(str != null) {
293             gossip_interval=Long.parseLong(str);
294             props.remove("gossip_interval");
295         }
296
297         str=props.getProperty("mcast_gossip");
298         if(str != null) {
299             mcast_gossip=Boolean.valueOf(str).booleanValue();
300             props.remove("mcast_gossip");
301         }
302
303         str=props.getProperty("subset");
304         if(str != null) {
305             subset=Double.parseDouble(str);
306             props.remove("subset");
307         }
308
309         str=props.getProperty("desired_avg_gossip");
310         if(str != null) {
311             desired_avg_gossip=Long.parseLong(str);
312             props.remove("desired_avg_gossip");
313         }
314
315         str=props.getProperty("max_queue");
316         if(str != null) {
317             max_queue=Integer.parseInt(str);
318             props.remove("max_queue");
319         }
320
321         str=props.getProperty("max_gossip_cache");
322         if(str != null) {
323             max_gossip_cache=Integer.parseInt(str);
324             props.remove("max_gossip_cache");
325         }
326
327         str=props.getProperty("gc_lag");
328         if(str != null) {
329             gc_lag=Integer.parseInt(str);
330             props.remove("gc_lag");
331         }
332
333         if(props.size() > 0) {
334             System.err.println("PBCAST.setProperties(): the following properties are not recognized:");
335             props.list(System.out);
336             return false;
337         }
338         return true;
339     }
340
341
342
343     /* --------------------------------- Private Methods --------------------------------------------- */
344
345
346     /**
347      Ensures that FIFO is observed for all messages for a certain member. The NakReceiverWindow corresponding
348      to a certain sender is looked up in a hashtable. Then, the message is added to the NakReceiverWindow.
349      As many messages as possible are then removed from the table and passed up.
350      */

351     void handleUpMessage(Message m, PbcastHeader hdr) {
352         Address sender=m.getSrc();
353         NakReceiverWindow win=null;
354         Message tmpmsg;
355         long seqno=hdr.seqno;
356
357         if(sender == null) {
358             if(log.isErrorEnabled()) log.error("sender is null");
359             return;
360         }
361
362         synchronized(digest) {
363             win=(NakReceiverWindow) digest.get(sender);
364             if(win == null) {
365                 if(log.isWarnEnabled()) log.warn("NakReceiverWindow for sender " + sender +
366                                                        " not found. Creating new NakReceiverWindow starting at seqno=" + seqno);
367                 win=new NakReceiverWindow(sender, seqno);
368                 digest.put(sender, win);
369             }
370
371             // *************************************
372
// The header was removed before, so we add it again for the NakReceiverWindow. When there is a
373
// retransmission request, the header will already be attached to the message (both message and
374
// header are *copied* into delivered_msgs when a message is removed from NakReceiverWindow).
375
// *************************************
376
m.putHeader(getName(), hdr);
377             win.add(seqno, m);
378
379
380                 if(log.isInfoEnabled()) log.info("receiver window for " + sender + " is " + win);
381
382             // Try to remove as many message as possible and send them up the stack
383
while((tmpmsg=win.remove()) != null) {
384                 tmpmsg.removeHeader(getName()); // need to remove header again, so upper protocols don't get confused
385
passUp(new Event(Event.MSG, tmpmsg));
386             }
387
388
389             // Garbage collect messages if singleton member (because then we won't receive any gossips, triggering
390
// garbage collection)
391
if(members.size() == 1) {
392                 seqno=Math.max(seqno - gc_lag, 0);
393                 if(seqno <= 0) {
394                 }
395                 else {
396
397                         if(log.isInfoEnabled()) log.info("deleting messages < " +
398                                                                seqno + " from " + sender);
399                     win.stable(seqno);
400                 }
401             }
402         }
403     }
404
405
406     /**
407      * Returns for each sender the 'highest seen' seqno from the digest. Highest seen means the
408      * highest seqno without any gaps, e.g. if for a sender P the messages 2 3 4 6 7 were received,
409      * then only 2, 3 and 4 can be delivered, so 4 is the highest seen. 6 and 7 cannot because there
410      * 5 is missing. If there are no message, the highest seen seqno is -1.
411      */

412     Digest getDigest() {
413         Digest ret=new Digest(digest.size());
414         long highest_seqno, lowest_seqno;
415         Address key;
416         NakReceiverWindow win;
417
418         for(Enumeration JavaDoc e=digest.keys(); e.hasMoreElements();) {
419             key=(Address) e.nextElement();
420             win=(NakReceiverWindow) digest.get(key);
421             lowest_seqno=win.getLowestSeen();
422             highest_seqno=win.getHighestSeen();
423             ret.add(key, lowest_seqno, highest_seqno);
424         }
425
426         if(log.isInfoEnabled()) log.info("digest is " + ret);
427
428         return ret;
429     }
430
431
432     /**
433      * Sets (or resets) the contents of the 'digest' table. Its current messages will be deleted and the
434      * NakReceiverTables reset.
435      */

436     void setDigest(Digest d) {
437         NakReceiverWindow win;
438         Address sender;
439         long seqno=1;
440
441         synchronized(digest) {
442             for(Enumeration JavaDoc e=digest.elements(); e.hasMoreElements();) {
443                 win=(NakReceiverWindow) e.nextElement();
444                 win.reset();
445             }
446             digest.clear();
447             for(int i=0; i < d.size(); i++) {
448                 sender=d.senderAt(i);
449                 seqno=d.highSeqnoAt(i);
450                 if(sender == null) {
451                     if(log.isErrorEnabled()) log.error("cannot set item because sender is null");
452                     continue;
453                 }
454                 digest.put(sender, new NakReceiverWindow(sender, seqno + 1)); // next to expect, digest had *last* seen !
455
}
456
457         }
458     }
459
460
461     String JavaDoc printDigest() {
462         long highest_seqno;
463         Address key;
464         NakReceiverWindow win;
465         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
466
467         for(Enumeration JavaDoc e=digest.keys(); e.hasMoreElements();) {
468             key=(Address) e.nextElement();
469             win=(NakReceiverWindow) digest.get(key);
470             highest_seqno=win.getHighestSeen();
471             sb.append(key + ": " + highest_seqno + '\n');
472         }
473         return sb.toString();
474     }
475
476
477     String JavaDoc printIncomingMessageQueue() {
478         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
479         NakReceiverWindow win;
480
481         win=(NakReceiverWindow) digest.get(local_addr);
482         sb.append(win);
483         return sb.toString();
484     }
485
486
487     void startGossipThread() {
488         if(gossip_thread == null) {
489             gossip_thread=new Thread JavaDoc(this);
490             gossip_thread.setDaemon(true);
491             gossip_thread.start();
492         }
493     }
494
495
496     void stopGossipThread() {
497         Thread JavaDoc tmp;
498
499         if(gossip_thread != null) {
500             if(gossip_thread.isAlive()) {
501                 tmp=gossip_thread;
502                 gossip_thread=null;
503                 tmp.interrupt();
504                 tmp=null;
505             }
506         }
507         gossip_thread=null;
508     }
509
510
511     void startGossipHandler() {
512         if(gossip_handler == null) {
513             gossip_handler=new GossipHandler(gossip_queue);
514             gossip_handler.start();
515         }
516     }
517
518     void stopGossipHandler() {
519         if(gossip_handler != null) {
520             gossip_handler.stop();
521             gossip_handler=null;
522         }
523     }
524
525
526     /**
527      * Send a gossip message with a message digest of the highest seqnos seen per sender to a subset
528      * of the current membership. Exclude self (I receive all mcasts sent by myself).
529      */

530     void sendGossip() {
531         Vector JavaDoc current_mbrs=(Vector JavaDoc) members.clone();
532         Vector JavaDoc subset_mbrs=null;
533         Gossip gossip=null;
534         Message msg;
535         Address dest;
536         PbcastHeader hdr;
537
538
539         if(local_addr != null)
540             current_mbrs.remove(local_addr); // don't pick myself
541

542         if(mcast_gossip) { // send gossip to all members using a multicast
543
gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), null); // not_seen list is null, prevents forwarding
544
for(int i=0; i < current_mbrs.size(); i++) // all members have seen this gossip. Used for garbage collection
545
gossip.addToSeenList((Address) current_mbrs.elementAt(i));
546             hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP);
547             msg=new Message(); // null dest == multicast to all members
548
msg.putHeader(getName(), hdr);
549
550
551                 if(log.isInfoEnabled()) log.info("(from " + local_addr +
552                            ") multicasting gossip " + gossip.shortForm() + " to all members");
553
554             passDown(new Event(Event.MSG, msg));
555         }
556         else {
557             subset_mbrs=Util.pickSubset(current_mbrs, subset);
558
559             for(int i=0; i < subset_mbrs.size(); i++) {
560                 gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), (Vector JavaDoc) current_mbrs.clone());
561                 gossip.addToSeenList(local_addr);
562                 hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP);
563                 dest=(Address) subset_mbrs.elementAt(i);
564                 msg=new Message(dest, null, null);
565                 msg.putHeader(getName(), hdr);
566
567
568                     if(log.isInfoEnabled()) log.info("(from " + local_addr +
569                                ") sending gossip " + gossip.shortForm() + " to " + subset_mbrs);
570
571                 passDown(new Event(Event.MSG, msg));
572             }
573         }
574
575         gossip_round++;
576     }
577
578
579     /**
580      * MOST IMPORTANT METHOD IN THIS CLASS !! This guy really decides how a gossip reaches all members,
581      * or whether it will flood the network !<p>
582      * Scrutinize the gossip received and request retransmission of messages that we haven't received yet.
583      * A gossip has a digest which carries for each sender the lowest and highest seqno seen. We check
584      * this range against our own digest and request retransmission of missing messages if needed.<br>
585      * <em>See DESIGN for a description of this method</em>
586      */

587     void handleGossip(Gossip gossip) {
588         long my_low=0, my_high=0, their_low, their_high;
589         Hashtable JavaDoc ht=null;
590         Digest their_digest;
591         Address sender=null;
592         NakReceiverWindow win;
593         Message msg;
594         Address dest;
595         Vector JavaDoc new_dests;
596         PbcastHeader hdr;
597         List missing_msgs; // list of missing messages (for retransmission) (List of Longs)
598

599
600
601             if(log.isInfoEnabled()) log.info("(from " + local_addr +
602                                                 ") received gossip " + gossip.shortForm() + " from " + gossip.sender);
603
604
605         if(gossip == null || gossip.digest == null) {
606             if(log.isWarnEnabled()) log.warn("gossip is null or digest is null");
607             return;
608         }
609
610
611         /* 1. If gossip sender is null, we cannot ask it for missing messages anyway, so discard gossip ! */
612         if(gossip.sender == null) {
613             if(log.isErrorEnabled()) log.error("sender of gossip is null; " +
614                                                  "don't know where to send XMIT_REQ to. Discarding gossip");
615             return;
616         }
617
618
619         /* 2. Don't process the gossip if the sender of the gossip is not a member anymore. If it is a newly
620            joined member, discard it as well (we can't tell the difference). When the new member will be
621            added to the membership, then its gossips will be processed */

622         if(!members.contains(gossip.sender)) {
623             if(log.isWarnEnabled()) log.warn("sender " + gossip.sender +
624                                                 " is not a member. Gossip will not be processed");
625             if(shun)
626                 shunInvalidGossiper(gossip.sender);
627             return;
628         }
629
630
631         /* 3. If this gossip was received before, just discard it and return (don't process the
632            same gossip twice). This prevents flooding of the gossip sender with retransmission reqs */

633         while(gossip_list.size() >= max_gossip_cache) // first delete oldest gossips
634
gossip_list.removeFromHead();
635
636         if(gossip_list.contains(gossip)) // already received, don't re-broadcast
637
return;
638         else
639             gossip_list.add(gossip.copy()); // add to list of received gossips
640

641
642
643         /* 4. Send a HEARD_FROM event containing all members in the gossip-chain down to the FD layer.
644            This ensures that we don't suspect them */

645         seen_list=gossip.getSeenList();
646         if(seen_list.size() > 0)
647             passDown(new Event(Event.HEARD_FROM, seen_list.clone()));
648
649
650
651         /* 5. Compare their digest against ours. Find out if some messages in the their digest are
652            not in our digest. If yes, put them in the 'ht' hashtable for retransmission */

653         their_digest=gossip.digest;
654         for(int i=0; i < their_digest.size(); i++) {
655             sender=their_digest.senderAt(i);
656             their_low=their_digest.lowSeqnoAt(i);
657             their_high=their_digest.highSeqnoAt(i);
658
659             if(their_low == 0 && their_high == 0)
660                 continue; // won't have any messages for this sender, don't even re-send
661

662             win=(NakReceiverWindow) digest.get(sender);
663             if(win == null) {
664                 // this specific sender in this digest is probably not a member anymore, new digests
665
// won't contain it. for now, just ignore it. if it is a new member, it will be in the next
666
// gossips
667

668                     if(log.isWarnEnabled()) log.warn("sender " + sender + " not found, skipping...");
669                 continue;
670             }
671
672             my_low=win.getLowestSeen();
673             my_high=win.getHighestSeen();
674             if(my_high < their_high) {
675                 // changed by Bela (June 26 2003) - replaced my_high with my_low (not tested though !)
676
if(my_low + 1 < their_low) {
677                     ;
678                 }
679                 else {
680                     missing_msgs=win.getMissingMessages(my_high, their_high);
681                     if(missing_msgs != null) {
682                         if(log.isInfoEnabled())
683                             log.info("asking " + gossip.sender + " for retransmission of " +
684                                     sender + ", missing messages: " + missing_msgs + "\nwin for " + sender + ":\n" + win + '\n');
685                         if(ht == null) ht=new Hashtable JavaDoc();
686                         ht.put(sender, missing_msgs);
687                     }
688                 }
689             }
690         }
691
692
693
694         /* 6. Send a XMIT_REQ to the sender of the gossip. The sender will then resend those messages as
695            an XMIT_RSP unicast message (the messages are in its buffer, as a List) */

696         if(ht == null || ht.size() == 0) {
697         }
698         else {
699             hdr=new PbcastHeader(PbcastHeader.XMIT_REQ);
700             hdr.xmit_reqs=ht;
701
702                 if(log.isInfoEnabled()) log.info("sending XMIT_REQ to " + gossip.sender);
703             msg=new Message(gossip.sender, null, null);
704             msg.putHeader(getName(), hdr);
705             passDown(new Event(Event.MSG, msg));
706         }
707
708
709
710         /* 7. Remove myself from 'not_seen' list. If not_seen list is empty, we can garbage-collect messages
711            smaller than the digest. Since all the members have seen the gossip, it will not be re-sent */

712         gossip.removeFromNotSeenList(local_addr);
713         if(gossip.sizeOfNotSeenList() == 0) {
714             garbageCollect(gossip.digest);
715             return;
716         }
717
718
719
720         /* 8. If we make it to this point, re-send to subset of remaining members in 'not_seen' list */
721         new_dests=Util.pickSubset(gossip.getNotSeenList(), subset);
722
723
724             if(log.isInfoEnabled()) log.info("(from " + local_addr +
725                                                 ") forwarding gossip " + gossip.shortForm() + " to " + new_dests);
726         gossip.addToSeenList(local_addr);
727         for(int i=0; i < new_dests.size(); i++) {
728             dest=(Address) new_dests.elementAt(i);
729             msg=new Message(dest, null, null);
730             hdr=new PbcastHeader(gossip.copy(), PbcastHeader.GOSSIP);
731             msg.putHeader(getName(), hdr);
732             passDown(new Event(Event.MSG, msg));
733         }
734     }
735
736
737     /**
738      * Find the messages indicated in <code>xmit_reqs</code> and re-send them to
739      * <code>requester</code>
740      */

741     void handleXmitRequest(Address requester, Hashtable JavaDoc xmit_reqs) {
742         NakReceiverWindow win;
743         Address sender;
744         List msgs, missing_msgs, xmit_msgs;
745         Message msg;
746
747         if(requester == null) {
748             if(log.isErrorEnabled()) log.error("requester is null");
749             return;
750         }
751
752          {
753             if(log.isInfoEnabled()) log.info("retransmission requests are " + printXmitReqs(xmit_reqs));
754         }
755         for(Enumeration JavaDoc e=xmit_reqs.keys(); e.hasMoreElements();) {
756             sender=(Address) e.nextElement();
757             win=(NakReceiverWindow) digest.get(sender);
758             if(win == null) {
759                 if(log.isWarnEnabled()) log.warn("sender " + sender +
760                                                          " not found in my digest; skipping retransmit request !");
761                 continue;
762             }
763
764             missing_msgs=(List) xmit_reqs.get(sender);
765             msgs=win.getMessagesInList(missing_msgs); // msgs to be sent back to requester
766

767
768
769             // re-send the messages to requester. don't add a header since they already have headers
770
// (when added to the NakReceiverWindow, the headers were not removed)
771
xmit_msgs=new List();
772             for(Enumeration JavaDoc en=msgs.elements(); en.hasMoreElements();) {
773                 msg=((Message) en.nextElement()).copy();
774                 xmit_msgs.add(msg);
775             }
776
777             // create a msg with the List of xmit_msgs as contents, add header
778
msg=new Message(requester, null, xmit_msgs);
779             msg.putHeader(getName(), new PbcastHeader(PbcastHeader.XMIT_RSP));
780             passDown(new Event(Event.MSG, msg));
781         }
782     }
783
784
785     void handleXmitRsp(List xmit_msgs) {
786         Message m;
787         PbcastHeader hdr;
788
789         for(Enumeration JavaDoc e=xmit_msgs.elements(); e.hasMoreElements();) {
790             m=(Message) e.nextElement();
791             hdr=(PbcastHeader) m.removeHeader(getName());
792             if(hdr == null) {
793                 log.warn("header is null, ignoring message");
794             }
795             else {
796                 if(log.isInfoEnabled()) log.info("received #" + hdr.seqno + ", type=" +
797                         PbcastHeader.type2String(hdr.type) + ", msg=" + m);
798                 handleUpMessage(m, hdr);
799             }
800         }
801     }
802
803
804     String JavaDoc printXmitReqs(Hashtable JavaDoc xmit_reqs) {
805         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
806         Address key;
807         boolean first=true;
808
809         if(xmit_reqs == null)
810             return "<null>";
811
812         for(Enumeration JavaDoc e=xmit_reqs.keys(); e.hasMoreElements();) {
813             key=(Address) e.nextElement();
814             if(!first) {
815                 sb.append(", ");
816             }
817             else
818                 first=false;
819             sb.append(key + ": " + xmit_reqs.get(key));
820         }
821         return sb.toString();
822     }
823
824
825     void garbageCollect(Digest gc) {
826         Address sender;
827         long seqno;
828         NakReceiverWindow win;
829
830         for(int i=0; i < gc.size(); i++) {
831             sender=gc.senderAt(i);
832             win=(NakReceiverWindow) digest.get(sender);
833             if(win == null) {
834                 if(log.isDebugEnabled()) log.debug("sender " + sender +
835                                                        " not found in our message digest, skipping");
836                 continue;
837             }
838             seqno=gc.highSeqnoAt(i);
839             seqno=Math.max(seqno - gc_lag, 0);
840             if(seqno <= 0) {
841                 continue;
842             }
843
844                 if(log.isInfoEnabled()) log.info("(from " + local_addr +
845                                                       ") GC: deleting messages < " + seqno + " from " + sender);
846             win.stable(seqno);
847         }
848     }
849
850
851     /**
852      * If sender of gossip is not a member, send a NOT_MEMBER to sender (after n gossips received).
853      * This will cause that member to leave the group and possibly re-join.
854      */

855     void shunInvalidGossiper(Address invalid_gossiper) {
856         int num_pings=0;
857         Message shun_msg;
858
859         if(invalid_gossipers.containsKey(invalid_gossiper)) {
860             num_pings=((Integer JavaDoc) invalid_gossipers.get(invalid_gossiper)).intValue();
861             if(num_pings >= max_invalid_gossips) {
862
863                     if(log.isInfoEnabled()) log.info("sender " + invalid_gossiper +
864                                                                " is not member of " + members + " ! Telling it to leave group");
865                 shun_msg=new Message(invalid_gossiper, null, null);
866                 shun_msg.putHeader(getName(), new PbcastHeader(PbcastHeader.NOT_MEMBER));
867                 passDown(new Event(Event.MSG, shun_msg));
868                 invalid_gossipers.remove(invalid_gossiper);
869             }
870             else {
871                 num_pings++;
872                 invalid_gossipers.put(invalid_gossiper, new Integer JavaDoc(num_pings));
873             }
874         }
875         else {
876             num_pings++;
877             invalid_gossipers.put(invalid_gossiper, new Integer JavaDoc(num_pings));
878         }
879     }
880
881
882     /** Computes the gossip_interval. See DESIGN for details */
883     long computeGossipInterval(int num_mbrs, double desired_avg_gossip) {
884         return getRandom((long) (num_mbrs * desired_avg_gossip * 2));
885     }
886
887
888     long getRandom(long range) {
889         return (long) ((Math.random() * range) % range);
890     }
891
892
893     /* ------------------------------- End of Private Methods ---------------------------------------- */
894
895
896     private static class GossipEntry {
897         PbcastHeader hdr=null;
898         Address sender=null;
899         byte[] data=null;
900
901         GossipEntry(PbcastHeader hdr, Address sender, byte[] data) {
902             this.hdr=hdr;
903             this.sender=sender;
904             this.data=data;
905         }
906
907         public String JavaDoc toString() {
908             return "hdr=" + hdr + ", sender=" + sender + ", data=" + data;
909         }
910     }
911
912
913     /**
914      Handles gossip and retransmission requests. Removes requests from a (bounded) queue.
915      */

916     private class GossipHandler implements Runnable JavaDoc {
917         Thread JavaDoc t=null;
918         final Queue gossip_queue;
919
920
921         GossipHandler(Queue q) {
922             gossip_queue=q;
923         }
924
925
926         void start() {
927             if(t == null) {
928                 t=new Thread JavaDoc(this, "PBCAST.GossipHandlerThread");
929                 t.setDaemon(true);
930                 t.start();
931             }
932         }
933
934
935         void stop() {
936             Thread JavaDoc tmp;
937             if(t != null && t.isAlive()) {
938                 tmp=t;
939                 t=null;
940                 if(gossip_queue != null)
941                     gossip_queue.close(false); // don't flush elements
942
tmp.interrupt();
943             }
944             t=null;
945         }
946
947
948         public void run() {
949             GossipEntry entry;
950             PbcastHeader hdr;
951             List xmit_msgs;
952             byte[] data;
953
954             while(t != null && gossip_queue != null) {
955                 try {
956                     entry=(GossipEntry) gossip_queue.remove();
957                     hdr=entry.hdr;
958                     if(hdr == null) {
959                         if(log.isErrorEnabled()) log.error("gossip entry has no PbcastHeader");
960                         continue;
961                     }
962
963                     switch(hdr.type) {
964
965                         case PbcastHeader.GOSSIP:
966                             handleGossip(hdr.gossip);
967                             break;
968
969                         case PbcastHeader.XMIT_REQ:
970                             if(hdr.xmit_reqs == null) {
971                                 if(log.isWarnEnabled()) log.warn("request is null !");
972                                 break;
973                             }
974                             handleXmitRequest(entry.sender, hdr.xmit_reqs);
975                             break;
976
977                         case PbcastHeader.XMIT_RSP:
978                             data=entry.data;
979                             if(data == null) {
980                                 if(log.isWarnEnabled()) log.warn("buffer is null (no xmitted msgs)");
981                                 break;
982                             }
983                             try {
984                                 xmit_msgs=(List) Util.objectFromByteBuffer(data);
985                             }
986                             catch(Exception JavaDoc ex) {
987                                 if(log.isErrorEnabled()) log.error(ex.getMessage());
988                                 break;
989                             }
990                             handleXmitRsp(xmit_msgs);
991                             break;
992
993                         case PbcastHeader.NOT_MEMBER: // we are shunned
994
if(shun) {
995                                 if(log.isInfoEnabled()) log.info("I am being shunned. Will leave and re-join");
996                                 passUp(new Event(Event.EXIT));
997                             }
998                             break;
999
1000                        default:
1001                            if(log.isErrorEnabled()) log.error("type (" + hdr.type +
1002                                                                         ") of PbcastHeader not known !");
1003                            return;
1004                    }
1005                }
1006                catch(QueueClosedException closed) {
1007                    break;
1008                }
1009            }
1010        }
1011    }
1012
1013}
1014
1015
Popular Tags