KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > STABLE


1 // $Id: STABLE.java,v 1.19 2005/04/14 17:04:17 belaban Exp $
2

3 package org.jgroups.protocols.pbcast;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.Promise;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Util;
11 import org.jgroups.util.Streamable;
12
13 import java.io.*;
14 import java.util.Properties JavaDoc;
15 import java.util.Vector JavaDoc;
16
17
18
19
20 /**
21  * Computes the broadcast messages that are stable, i.e. have been received by all members. Sends
22  * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
23  * have been seen by all members.<p>
24  * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
25  * A stability vector, which maintains the highest seqno for each member and initially contains no data,
26  * is updated when such a message is received. The entry for a member P is computed set to
27  * min(entry[P], digest[P]). When messages from all members have been received, a stability
28  * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
29  * in the NAKACK layer).<p>
30  * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
31  * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
32  * STABLE messages in the face of no activity.<br/>
33  * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
34  * a STABLE task will be started (unless it is already running).
35  * @author Bela Ban
36  */

37 public class STABLE extends Protocol {
38     Address local_addr=null;
39     final Vector JavaDoc mbrs=new Vector JavaDoc();
40     final Digest digest=new Digest(); // keeps track of the highest seqnos from all members
41
final Promise digest_promise=new Promise(); // for fetching digest (from NAKACK layer)
42
final Vector JavaDoc heard_from=new Vector JavaDoc(); // keeps track of who we already heard from (STABLE_GOSSIP msgs)
43
long digest_timeout=60000; // time to wait until digest is received (from NAKACK)
44

45     /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
46     long desired_avg_gossip=20000;
47
48     /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
49      * small number (> 0 !) if <code>max_bytes</code> is used */

50     long stability_delay=6000;
51     StabilitySendTask stability_task=null;
52     final Object JavaDoc stability_mutex=new Object JavaDoc(); // to synchronize on stability_task
53
StableTask stable_task=null; // bcasts periodic STABLE message (added to timer below)
54
final Object JavaDoc stable_task_mutex=new Object JavaDoc(); // to sync on stable_task
55
TimeScheduler timer=null; // to send periodic STABLE msgs (and STABILITY messages)
56
int max_gossip_runs=3; // max. number of times the StableTask runs before terminating
57
int num_gossip_runs=max_gossip_runs; // this number is decremented (max_gossip_runs doesn't change)
58
static final String JavaDoc name="STABLE";
59
60     /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
61      * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
62      * <code>stability_delay</code> should be set to a low number as well */

63     long max_bytes=0;
64
65     /** The total number of bytes received from unicast and multicast messages */
66     long num_bytes_received=0;
67
68     /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
69      * handle STABILITY messages */

70     boolean suspended=false;
71
72     /** Max time we should hold off on message garbage collection. This is a second line of defense in case
73      * we get a SUSPEND_STABLE, but forget to send a corresponding RESUME_STABLE (which should never happen !)
74      * The consequence of a missing RESUME_STABLE would be that the group doesn't garbage collect stable
75      * messages anymore, eventually, with a lot of traffic, every member would accumulate messages and run
76      * out of memory !
77      */

78     // long max_suspend_time=600000;
79

80     ResumeTask resume_task=null;
81     final Object JavaDoc resume_task_mutex=new Object JavaDoc();
82
83
84     public String JavaDoc getName() {
85         return name;
86     }
87
88
89     public Vector JavaDoc requiredDownServices() {
90         Vector JavaDoc retval=new Vector JavaDoc();
91         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST_STABLE)); // NAKACK layer
92
return retval;
93     }
94
95     public boolean setProperties(Properties JavaDoc props) {
96         String JavaDoc str;
97
98         super.setProperties(props);
99         str=props.getProperty("digest_timeout");
100         if(str != null) {
101             digest_timeout=Long.parseLong(str);
102             props.remove("digest_timeout");
103         }
104
105         str=props.getProperty("desired_avg_gossip");
106         if(str != null) {
107             desired_avg_gossip=Long.parseLong(str);
108             props.remove("desired_avg_gossip");
109         }
110
111         str=props.getProperty("stability_delay");
112         if(str != null) {
113             stability_delay=Long.parseLong(str);
114             props.remove("stability_delay");
115         }
116
117         str=props.getProperty("max_gossip_runs");
118         if(str != null) {
119             max_gossip_runs=Integer.parseInt(str);
120             num_gossip_runs=max_gossip_runs;
121             props.remove("max_gossip_runs");
122         }
123
124         str=props.getProperty("max_bytes");
125         if(str != null) {
126             max_bytes=Long.parseLong(str);
127             props.remove("max_bytes");
128         }
129
130         str=props.getProperty("max_suspend_time");
131         if(str != null) {
132             System.err.println("max_suspend_time is not supported any longer; please remove it (ignoring it)");
133             props.remove("max_suspend_time");
134         }
135
136         if(props.size() > 0) {
137             System.err.println("STABLE.setProperties(): these properties are not recognized:");
138             props.list(System.out);
139             return false;
140         }
141         return true;
142     }
143
144
145     void suspend(long timeout) {
146         if(!suspended) {
147             suspended=true;
148             if(log.isDebugEnabled())
149                 log.debug("suspending message garbage collection");
150         }
151         startResumeTask(timeout); // will not start task if already running
152
}
153
154     void resume() {
155         suspended=false;
156         if(log.isDebugEnabled())
157             log.debug("resuming message garbage collection");
158         stopResumeTask();
159     }
160
161     public void start() throws Exception JavaDoc {
162         if(stack != null && stack.timer != null)
163             timer=stack.timer;
164         else
165             throw new Exception JavaDoc("STABLE.up(): timer cannot be retrieved from protocol stack");
166     }
167
168     public void stop() {
169         stopStableTask();
170     }
171
172
173     public void up(Event evt) {
174         Message msg;
175         StableHeader hdr;
176         Header obj;
177         int type=evt.getType();
178
179         switch(type) {
180
181         case Event.MSG:
182             msg=(Message)evt.getArg();
183
184             if(max_bytes > 0) { // message counting is enabled
185
long size=Math.max(msg.getLength(), 24);
186                 num_bytes_received+=size;
187                 if(num_bytes_received >= max_bytes) {
188                     if(log.isTraceEnabled()) {
189                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc("max_bytes has been exceeded (max_bytes=");
190                         sb.append(max_bytes).append(", number of bytes received=");
191                         sb.append(num_bytes_received).append("): sending STABLE message");
192                         log.trace(sb.toString());
193                     }
194
195                     new Thread JavaDoc() {
196                         public void run() {
197                             initialize();
198                             sendStableMessage();
199                         }
200                     }.start();
201                     num_bytes_received=0;
202                 }
203             }
204
205             obj=msg.getHeader(name);
206             if(obj == null || !(obj instanceof StableHeader))
207                 break;
208             hdr=(StableHeader)msg.removeHeader(name);
209             switch(hdr.type) {
210             case StableHeader.STABLE_GOSSIP:
211                 handleStableGossip(msg.getSrc(), hdr.stableDigest);
212                 break;
213             case StableHeader.STABILITY:
214                 handleStabilityMessage(hdr.stableDigest);
215                 break;
216             default:
217                 if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known");
218             }
219             return; // don't pass STABLE or STABILITY messages up the stack
220

221         case Event.SET_LOCAL_ADDRESS:
222             local_addr=(Address)evt.getArg();
223             break;
224         }
225
226         passUp(evt);
227         if(desired_avg_gossip > 0) {
228             if(type == Event.VIEW_CHANGE || type == Event.MSG)
229                 startStableTask(); // only starts task if not yet running
230
}
231     }
232
233
234     /**
235      * We need to receive this event out-of-band, otherwise we would block. The use case is
236      * <ol>
237      * <li>To send a STABLE_GOSSIP message we need the digest (from NAKACK below)
238      * <li>We send a GET_DIGEST_STABLE event down <em>from the up() method</em>
239      * <li>NAKACK sends the GET_DIGEST_STABLE_OK backup. <em>However, we may have other messages in the
240      * up queue ahead of this event !</em> Therefore the event cannot be processed until all messages ahead of
241      * the event have been processed. These can't be processed, however, because the up() call waits for
242      * GET_DIGEST_STABLE_OK ! The up() call would always run into the timeout.<be/>
243      * Having out-of-band reception of just this one event eliminates the problem.
244      * </ol>
245      * @param evt
246      */

247     protected void receiveUpEvent(Event evt) {
248         if(evt.getType() == Event.GET_DIGEST_STABLE_OK) {
249             digest_promise.setResult(evt.getArg());
250             return;
251         }
252         super.receiveUpEvent(evt);
253     }
254
255
256     public void down(Event evt) {
257         int type=evt.getType();
258
259         switch(evt.getType()) {
260             case Event.VIEW_CHANGE:
261                 View v=(View)evt.getArg();
262                 Vector JavaDoc tmp=v.getMembers();
263                 mbrs.removeAllElements();
264                 mbrs.addAll(tmp);
265                 heard_from.retainAll(tmp); // removes all elements from heard_from that are not in new view
266
stopStableTask();
267                 break;
268
269             case Event.SUSPEND_STABLE:
270                 long timeout=0;
271                 Object JavaDoc t=evt.getArg();
272                 if(t != null && t instanceof Long JavaDoc)
273                     timeout=((Long JavaDoc)t).longValue();
274                 stopStableTask();
275                 suspend(timeout);
276                 break;
277
278             case Event.RESUME_STABLE:
279                 resume();
280                 break;
281         }
282
283         if(desired_avg_gossip > 0) {
284             if(type == Event.VIEW_CHANGE || type == Event.MSG)
285                 startStableTask(); // only starts task if not yet running
286
}
287
288         passDown(evt);
289     }
290
291
292
293
294
295     /* --------------------------------------- Private Methods ---------------------------------------- */
296
297     void initialize() {
298         synchronized(digest) {
299             digest.reset(mbrs.size());
300             for(int i=0; i < mbrs.size(); i++)
301                 digest.add((Address)mbrs.elementAt(i), -1, -1);
302             heard_from.removeAllElements();
303             heard_from.addAll(mbrs);
304         }
305     }
306
307
308     void startStableTask() {
309         num_gossip_runs=max_gossip_runs;
310
311         // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
312
// 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
313
// 1 cycle: on the next message or view, we will start the task
314
if(stable_task != null)
315             return;
316         synchronized(stable_task_mutex) {
317             if(stable_task != null && stable_task.running()) {
318                 return; // already running
319
}
320             stable_task=new StableTask();
321             timer.add(stable_task, true); // fixed-rate scheduling
322
}
323         if(log.isTraceEnabled())
324             log.trace("stable task started; num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs);
325     }
326
327
328     void stopStableTask() {
329         // contrary to startStableTask(), we don't need double-checked locking here because this method is not
330
// called frequently
331
synchronized(stable_task_mutex) {
332             if(stable_task != null) {
333                 stable_task.stop();
334                 stable_task=null;
335             }
336         }
337     }
338
339
340     void startResumeTask(long max_suspend_time) {
341         max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
342

343         synchronized(resume_task_mutex) {
344             if(resume_task != null && resume_task.running()) {
345                 return; // already running
346
}
347             else {
348                 resume_task=new ResumeTask(max_suspend_time);
349                 timer.add(resume_task, true); // fixed-rate scheduling
350
}
351         }
352         if(log.isDebugEnabled())
353             log.debug("resume task started, max_suspend_time=" + max_suspend_time);
354     }
355
356
357     void stopResumeTask() {
358         synchronized(resume_task_mutex) {
359             if(resume_task != null) {
360                 resume_task.stop();
361                 resume_task=null;
362             }
363         }
364     }
365
366
367     void startStabilityTask(Digest d, long delay) {
368         synchronized(stability_mutex) {
369             if(stability_task != null && stability_task.running()) {
370                 return; // already running
371
}
372             else {
373                 stability_task=new StabilitySendTask(d, delay);
374                 timer.add(stability_task, true); // fixed-rate scheduling
375
}
376         }
377     }
378
379
380     void stopStabilityTask() {
381         synchronized(stability_mutex) {
382             if(stability_task != null) {
383                 stability_task.stop();
384                 stability_task=null;
385             }
386         }
387     }
388
389
390     /**
391      Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
392      <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
393      seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
394      message, which results in garbage collection of messages lower than the ones in the stability vector. The
395      maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
396      for details).
397      */

398     void handleStableGossip(Address sender, Digest d) {
399         Address mbr;
400         long highest_seqno, my_highest_seqno;
401         long highest_seen_seqno, my_highest_seen_seqno;
402
403         if(d == null || sender == null) {
404             if(log.isErrorEnabled()) log.error("digest or sender is null");
405             return;
406         }
407
408         if(suspended) {
409             if(log.isDebugEnabled()) {
410                 log.debug("STABLE message will not be handled as suspended=" + suspended);
411             }
412             return;
413         }
414
415         if(log.isDebugEnabled()) log.debug("received digest " + printStabilityDigest(d) + " from " + sender);
416         if(!heard_from.contains(sender)) { // already received gossip from sender; discard it
417
if(log.isDebugEnabled()) log.debug("already received gossip from " + sender);
418             return;
419         }
420
421         // we won't handle the gossip d, if d's members don't match the membership in my own digest,
422
// this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
423
if(!this.digest.sameSenders(d)) {
424             if(log.isDebugEnabled()) {
425                 log.debug("received digest from " + sender + " (digest=" + d + ") which does not match my own digest ("+
426                         this.digest + "): ignoring digest and re-initializing own digest");
427             }
428             initialize();
429             return;
430         }
431
432         for(int i=0; i < d.size(); i++) {
433             mbr=d.senderAt(i);
434             highest_seqno=d.highSeqnoAt(i);
435             highest_seen_seqno=d.highSeqnoSeenAt(i);
436             if(digest.getIndex(mbr) == -1) {
437                 if(log.isDebugEnabled()) log.debug("sender " + mbr + " not found in stability vector");
438                 continue;
439             }
440
441             // compute the minimum of the highest seqnos deliverable (for garbage collection)
442
my_highest_seqno=digest.highSeqnoAt(mbr);
443             if(my_highest_seqno < 0) {
444                 if(highest_seqno >= 0)
445                     digest.setHighSeqnoAt(mbr, highest_seqno);
446             }
447             else {
448                 digest.setHighSeqnoAt(mbr, Math.min(my_highest_seqno, highest_seqno));
449             }
450
451             // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
452
my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
453             if(my_highest_seen_seqno < 0) {
454                 if(highest_seen_seqno >= 0)
455                     digest.setHighSeqnoSeenAt(mbr, highest_seen_seqno);
456             }
457             else {
458                 digest.setHighSeqnoSeenAt(mbr, Math.max(my_highest_seen_seqno, highest_seen_seqno));
459             }
460         }
461
462         heard_from.removeElement(sender);
463         if(heard_from.size() == 0) {
464             if(log.isDebugEnabled()) log.debug("sending stability msg " + printStabilityDigest(digest));
465             sendStabilityMessage(digest.copy());
466             initialize();
467         }
468     }
469
470
471     /**
472      * Bcasts a STABLE message to all group members. Message contains highest seqnos of all members
473      * seen by this member. Highest seqnos are retrieved from the NAKACK layer above.
474      */

475     void sendStableMessage() {
476         Digest d=null;
477         Message msg=new Message(); // mcast message
478
StableHeader hdr;
479
480         if(suspended) {
481             if(log.isTraceEnabled())
482                 log.trace("will not send STABLE message as suspended=" + suspended);
483             return;
484         }
485
486         d=getDigest();
487         if(d != null && d.size() > 0) {
488             if(log.isTraceEnabled())
489                 log.trace("mcasting STABLE msg, digest=" + d +
490                           " (num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')');
491             hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
492             msg.putHeader(name, hdr);
493             passDown(new Event(Event.MSG, msg));
494         }
495     }
496
497
498
499     Digest getDigest() {
500         Digest ret=null;
501         passDown(new Event(Event.GET_DIGEST_STABLE));
502         ret=(Digest)digest_promise.getResult(digest_timeout);
503         if(ret == null) {
504             if(log.isErrorEnabled())
505                 log.error("digest could not be fetched from below " + "(timeout was " + digest_timeout + " msecs)");
506         }
507         return ret;
508     }
509
510
511     /**
512      Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
513      The reason for waiting a random amount of time is that, in the worst case, all members receive a
514      STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
515      STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
516      elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
517      waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
518      discard S2.
519      @param tmp A copy of te stability digest, so we don't need to copy it again
520      */

521     void sendStabilityMessage(Digest tmp) {
522         long delay;
523
524         if(timer == null) {
525             if(log.isErrorEnabled())
526                 log.error("timer is null, cannot schedule stability message to be sent");
527             timer=stack != null ? stack.timer : null;
528             return;
529         }
530
531         // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
532
// our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
533
// STABILITY msg at the same time
534
delay=Util.random(stability_delay);
535         startStabilityTask(tmp, delay);
536     }
537
538
539     void handleStabilityMessage(Digest d) {
540         if(d == null) {
541             if(log.isErrorEnabled()) log.error("stability vector is null");
542             return;
543         }
544
545         if(suspended) {
546             if(log.isDebugEnabled()) {
547                 log.debug("STABILITY message will not be handled as suspened=" + suspended);
548             }
549             return;
550         }
551
552         if(log.isDebugEnabled()) log.debug("stability vector is " + d.printHighSeqnos());
553         stopStabilityTask();
554
555         // we won't handle the gossip d, if d's members don't match the membership in my own digest,
556
// this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
557
if(!this.digest.sameSenders(d)) {
558             if(log.isDebugEnabled()) {
559                 log.debug("received digest (digest=" + d + ") which does not match my own digest ("+
560                         this.digest + "): ignoring digest and re-initializing own digest");
561             }
562             initialize();
563             return;
564         }
565
566         // pass STABLE event down the stack, so NAKACK can garbage collect old messages
567
passDown(new Event(Event.STABLE, d));
568     }
569
570
571     String JavaDoc printStabilityDigest(Digest d) {
572         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
573         boolean first=true;
574
575         if(d != null) {
576             for(int i=0; i < d.size(); i++) {
577                 if(!first)
578                     sb.append(", ");
579                 else
580                     first=false;
581                 sb.append(d.senderAt(i) + "#" + d.highSeqnoAt(i) + " (" + d.highSeqnoSeenAt(i) + ')');
582             }
583         }
584         return sb.toString();
585     }
586
587     /* ------------------------------------End of Private Methods ------------------------------------- */
588
589
590
591
592
593
594
595     public static class StableHeader extends Header implements Streamable {
596         static final int STABLE_GOSSIP=1;
597         static final int STABILITY=2;
598
599         int type=0;
600         // Digest digest=new Digest(); // used for both STABLE_GOSSIP and STABILITY message
601
Digest stableDigest=null; // changed by Bela April 4 2004
602

603         public StableHeader() {
604         } // used for externalizable
605

606
607         StableHeader(int type, Digest digest) {
608             this.type=type;
609             this.stableDigest=digest;
610         }
611
612
613         static String JavaDoc type2String(int t) {
614             switch(t) {
615                 case STABLE_GOSSIP:
616                     return "STABLE_GOSSIP";
617                 case STABILITY:
618                     return "STABILITY";
619                 default:
620                     return "<unknown>";
621             }
622         }
623
624         public String JavaDoc toString() {
625             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
626             sb.append('[');
627             sb.append(type2String(type));
628             sb.append("]: digest is ");
629             sb.append(stableDigest);
630             return sb.toString();
631         }
632
633
634         public void writeExternal(ObjectOutput out) throws IOException {
635             out.writeInt(type);
636             if(stableDigest == null) {
637                 out.writeBoolean(false);
638                 return;
639             }
640             out.writeBoolean(true);
641             stableDigest.writeExternal(out);
642         }
643
644
645         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
646             type=in.readInt();
647             boolean digest_not_null=in.readBoolean();
648             if(digest_not_null) {
649                 stableDigest=new Digest();
650                 stableDigest.readExternal(in);
651             }
652         }
653
654         public void writeTo(DataOutputStream out) throws IOException {
655             out.writeInt(type);
656             Util.writeStreamable(stableDigest, out);
657         }
658
659         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
660             type=in.readInt();
661             stableDigest=(Digest)Util.readStreamable(Digest.class, in);
662         }
663
664
665     }
666
667
668
669
670     /**
671      Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
672      However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
673      stable_send task terminates only after a period of time within which no messages were either sent
674      or received
675      */

676     private class StableTask implements TimeScheduler.Task {
677         boolean stopped=false;
678
679         public void stop() {
680             stopped=true;
681         }
682
683         public boolean running() { // syntactic sugar
684
return !stopped;
685         }
686
687         public boolean cancelled() {
688             return stopped;
689         }
690
691         public long nextInterval() {
692             long interval=computeSleepTime();
693             if(interval <= 0)
694                 return 10000;
695             else
696                 return interval;
697         }
698
699
700         public void run() {
701             if(suspended) {
702                 if(log.isTraceEnabled())
703                     log.trace("stable task will not run as suspended=" + suspended);
704                 stopStableTask();
705                 return;
706             }
707             initialize();
708             sendStableMessage();
709             num_gossip_runs--;
710             if(num_gossip_runs <= 0) {
711                 if(log.isTraceEnabled())
712                     log.trace("stable task terminating (num_gossip_runs=" +
713                               num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')');
714                 stopStableTask();
715             }
716         }
717
718         long computeSleepTime() {
719             return getRandom((mbrs.size() * desired_avg_gossip * 2));
720         }
721
722         long getRandom(long range) {
723             return (long)((Math.random() * range) % range);
724         }
725     }
726
727
728
729
730
731     /**
732      * Multicasts a STABILITY message.
733      */

734     private class StabilitySendTask implements TimeScheduler.Task {
735         Digest d=null;
736         boolean stopped=false;
737         long delay=2000;
738
739
740         public StabilitySendTask(Digest d, long delay) {
741             this.d=d;
742             this.delay=delay;
743         }
744
745         public boolean running() {
746             return !stopped;
747         }
748
749         public void stop() {
750             stopped=true;
751         }
752
753         public boolean cancelled() {
754             return stopped;
755         }
756
757
758         /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
759         public long nextInterval() {
760             return delay;
761         }
762
763
764         public void run() {
765             Message msg;
766             StableHeader hdr;
767
768             if(suspended) {
769                 if(log.isDebugEnabled()) {
770                     log.debug("STABILITY message will not be sent as suspended=" + suspended);
771                 }
772                 stopped=true;
773                 return;
774             }
775
776             if(d != null && !stopped) {
777                 msg=new Message();
778                 hdr=new StableHeader(StableHeader.STABILITY, d);
779                 msg.putHeader(STABLE.name, hdr);
780                 passDown(new Event(Event.MSG, msg));
781                 d=null;
782             }
783             stopped=true; // run only once
784
}
785     }
786
787
788     private class ResumeTask implements TimeScheduler.Task {
789         boolean running=true;
790         long max_suspend_time=0;
791
792         ResumeTask(long max_suspend_time) {
793             this.max_suspend_time=max_suspend_time;
794         }
795
796         void stop() {
797             running=false;
798         }
799
800         public boolean running() {
801             return running;
802         }
803
804         public boolean cancelled() {
805             return running == false;
806         }
807
808         public long nextInterval() {
809             return max_suspend_time;
810         }
811
812         public void run() {
813             if(suspended)
814                 log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
815                          "check why this event was not received (or increase max_suspend_time for large state transfers)");
816             resume();
817         }
818     }
819
820
821 }
822
Popular Tags