KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > STABLE


1 // $Id: STABLE.java,v 1.7 2004/09/23 16:29:42 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.blocks.GroupRequest;
8 import org.jgroups.blocks.MethodCall;
9 import org.jgroups.stack.RpcProtocol;
10 import org.jgroups.util.TimeScheduler;
11 import org.jgroups.util.Util;
12
13 import java.util.Properties JavaDoc;
14 import java.util.Vector JavaDoc;
15
16
17 /**
18  * Computes the broadcast messages that are stable, i.e. have been received
19  * by all members. Sends STABLE events up the stack when this is the case.
20  * Uses a probabilistic scheme to do so, as described in:<br>
21  * GSGC: An Efficient Gossip-Style Garbage Collection Scheme for Scalable
22  * Reliable Multicast, K. Guo et al., 1997.
23  * <p>
24  * The only difference is that instead of using counters for an estimation of
25  * messages received from each member, we retrieve this actual information
26  * from the NAKACK layer (which must be present for the STABLE protocol to
27  * work).
28  * <p>
29  * Note: the the <tt>Event.MSG</tt> call path path must be as lightweight as
30  * possible. It should not request any lock for which there is a high
31  * contention and/or long delay.
32  * <p>
33  * <pre>
34  * Changes(igeorg - 2.VI.2001):
35  * i. Thread-safety (in RPC calls most notably on the lines of Gianluca
36  * Collot's bugfix)
37  * ii. All slow calls (RPCs, seqnos requests, etc.) placed outside locks
38  * iii. Removed redundant initialization in adaptation to a higher round
39  * iv. heard_from[this meber] is always set to true on every new round
40  * (i.e. on every stability bcast).
41  * v. Replaced gossip thread with <tt>TimeScheduler.Task</tt>
42  * </pre>
43  * <p>
44  * [[[ TODO(igeorg - 2.VI.2001)
45  * i. Faster stability convergence by better selection of gossip subsets
46  * (replace Util.pickSubset()).
47  * ii. Special mutex on the <tt>Event.MSG</tt> call path. I.e. remove
48  * <tt>synchronized(this)</t>> with e.g. <tt>synchronized(msg_mutex)</tt>.
49  * ]] TODO
50  */

51 public class STABLE extends RpcProtocol {
52     /** The protocol name */
53     private static final String JavaDoc PROT_NAME="STABLE";
54
55     /** Default subgroup size for gossiping expressed as percentage overthe group's size */
56     private static final double SUBSET_SIZE=0.1;
57
58     /** Default max number of msgs to wait for before sending gossip */
59     private static final int GOSSIP_MSG_INTERVAL=100;
60
61     /** Default max time to wait before sending gossip (ms) */
62     private static final int GOSSIP_INTERVAL=10000;
63
64     private Address local_addr=null;
65     private ViewId vid=null;
66     private final Vector JavaDoc mbrs=new Vector JavaDoc(11);
67
68     /** gossip round */
69     private long round=1;
70
71     /** highest seqno received for each member (corresponds to membership) */
72     private long[] seqnos=new long[0];
73
74     /** Array of members from which we have received a gossip in the current round */
75     private boolean[] heard_from=new boolean[0];
76
77     /** Percentage of members to which gossip is sent (parameterizable by user) */
78     private double subset=SUBSET_SIZE;
79
80     /** The gossiping task scheduler */
81     private TimeScheduler sched=null;
82
83     private Task gossip_task;
84
85     /** wait for n messages until sending gossip ... */
86     private int max_msgs=GOSSIP_MSG_INTERVAL;
87
88     /** ... or until max_wait_time has elapsed, whichever comes first */
89     private long max_wait_time=GOSSIP_INTERVAL;
90
91     /** Current number of msgs left to be received before sending a gossip */
92     private long num_msgs=max_msgs;
93
94     /** mutex for interacting with NAKACK layer (GET_MSGS_RECVD) */
95     private final Object JavaDoc highest_seqnos_mutex=new Object JavaDoc();
96     
97     /** Time to wait for a reply from NAKACK layer (GET_MSGS_RECVD) */
98     private long highest_seqnos_timeout=4000;
99
100
101     /**
102      * @return this protocol name
103      */

104     public String JavaDoc getName() {
105         return (PROT_NAME);
106     }
107
108
109     /**
110      * The events expected to be handled from some layer above:
111      * <ul>
112      * <li>
113      * GET_MSGS_RECEIVED: NAKACK layer
114      * </li>
115      * </ul>
116      * @return a list of events expected by to be handled from some layer
117      * above
118      */

119     public Vector JavaDoc requiredUpServices() {
120         Vector JavaDoc retval=new Vector JavaDoc(1);
121         retval.addElement(new Integer JavaDoc(Event.GET_MSGS_RECEIVED));
122         return retval;
123     }
124
125     /**
126      * Set the parameters for this layer.
127      *
128      * <ul>
129      * <li>
130      * <i>subset</i>: the percentage of the group'size to which the
131      * msgs_seen_so_far gossip is sent periodically.</li>
132      * <li>
133      * <i>max_msgs</i>: the max number of msgs to wait for between two
134      * consecutive gossipings.</li>
135      * <li>
136      * <i>max_wait_time</i>: the max time to wait for between two consecutive
137      * gossipings.</li>
138      * <li>
139      * <i>highest_seqno_timeout</i>: time to wait to receive from NAKACK
140      * the array of highest deliverable seqnos
141      * </li>
142      * </ul>
143      *
144      * @param props the list of parameters
145      */

146     public boolean setProperties(Properties JavaDoc props) {
147         String JavaDoc str;
148
149         super.setProperties(props);
150         str=props.getProperty("subset");
151         if(str != null) {
152             subset=Float.parseFloat(str);
153             props.remove("subset");
154         }
155
156         str=props.getProperty("max_msgs");
157         if(str != null) {
158             num_msgs=max_msgs=Integer.parseInt(str);
159             if(max_msgs <= 1) {
160                 if(log.isFatalEnabled()) log.fatal("value for 'max_msgs' must be greater than 1 !");
161                 return false;
162             }
163             props.remove("max_msgs");
164         }
165
166         str=props.getProperty("max_wait_time");
167         if(str != null) {
168             max_wait_time=Long.parseLong(str);
169             props.remove("max_wait_time");
170         }
171
172         str=props.getProperty("highest_seqnos_timeout");
173         if(str != null) {
174             highest_seqnos_timeout=Long.parseLong(str);
175             props.remove("highest_seqnos_timeout");
176         }
177
178         if(props.size() > 0) {
179             System.err.println("STABLE.setProperties(): these properties " +
180                     "are not recognized:");
181             props.list(System.out);
182             return false;
183         }
184         return true;
185     }
186
187
188     /**
189      * Start the layer:
190      * i. Set the gossip task scheduler
191      * ii. Reset the layer's state.
192      * iii. Start the gossiping task
193      */

194     public void start() throws Exception JavaDoc {
195         TimeScheduler timer;
196
197         super.start();
198         timer=stack != null ? stack.timer : null;
199         if(timer == null)
200             throw new Exception JavaDoc("STABLE.start(): timer is null");
201
202         sched=timer;
203
204         // we use only asynchronous method invocations...
205
if(_corr != null)
206             _corr.setDeadlockDetection(false);
207         initialize();
208         startGossip();
209     }
210
211
212     /**
213      * Stop scheduling the gossip task
214      */

215     public void stop() {
216         super.stop();
217         synchronized(this) {
218             if(gossip_task != null)
219                 gossip_task.cancel();
220             gossip_task=null;
221         }
222     }
223
224
225     /* ------------------------- Request handler methods ------------------ */
226
227     /**
228      * Contains the highest sequence numbers as seen by <code>sender</code>
229      *
230      * @param view_id The view ID in which the gossip was sent. Must be the
231      * same as ours, otherwise it is discarded
232      *
233      * @param gossip_round The round in which the gossip was sent
234      *
235      * @param gossip_seqnos A vector with the highest sequence numbers as
236      * seen by <code>sender</code>
237      *
238      * @param heard The sender's <code>heard_from</code> array. This allows
239      * us to minimize the gossip msgs for a given round as a member does not
240      * have to receive gossip msgs from each member, but members pass gossips
241      * they've received from others on in their own gossips. E.g. when a
242      * member P (of group {P,Q,R}) receives a gossip from R, its own gossip
243      * to Q might be {R,P}. Q, who hasn't received a gossip from R, will not
244      * need to receive it anymore as it is already sent by P. This simple
245      * scheme reduces the number of gossip messages needed.
246      *
247      * @param sender The sender of the gossip message (obviously :-))
248      */

249     public void gossip(ViewId view_id, long gossip_round,
250                        long[] gossip_seqnos, boolean[] heard, Object JavaDoc sender) {
251         Object JavaDoc[] params;
252         MethodCall call;
253
254         synchronized(this) {
255
256                 if(log.isInfoEnabled()) log.info("sender=" + sender + ", round=" + gossip_round + ", seqnos=" +
257                         Util.array2String(gossip_seqnos) + ", heard=" +
258                         Util.array2String(heard));
259             if(vid == null || view_id == null || !vid.equals(view_id)) {
260
261                     if(log.isInfoEnabled()) log.info("view ID s are different (" + vid + " != " + view_id +
262                             "). Discarding gossip received");
263                 return;
264             }
265             if(gossip_round < this.round) {
266
267                     if(log.isInfoEnabled()) log.info("received a gossip from a previous round (" +
268                             gossip_round + "); my round is " + round +
269                             ". Discarding gossip");
270                 return;
271             }
272             if(gossip_seqnos == null || seqnos == null ||
273                     seqnos.length != gossip_seqnos.length) {
274
275                     if(log.isWarnEnabled()) log.warn("size of seqnos and gossip_seqnos are not equal ! " +
276                             "Discarding gossip");
277                 return;
278             }
279
280             // (1) If round greater than local round:
281
// i. Adjust the local to the received round
282
//
283
// (2)
284
// i. local_seqnos = arrayMin(local_seqnos, gossip_seqnos)
285
// ii. local_heard = arrayMax(local_heard, gossip_heard)
286
// iii. If heard from all, bcast our seqnos (stability vector)
287
if(round == gossip_round) {
288                 update(sender, gossip_seqnos, heard);
289             }
290             else if(round < gossip_round) {
291
292                     if(log.isInfoEnabled()) log.info("received a gossip from a higher round (" +
293                             gossip_round + "); adopting my round (" + round +
294                             ") to " + gossip_round);
295                 round=gossip_round;
296                 set(sender, gossip_seqnos, heard_from);
297             }
298
299              if(log.isInfoEnabled()) log.info("heard_from=" + Util.array2String(heard_from));
300             if(!heardFromAll())
301                 return;
302
303             params=new Object JavaDoc[]{
304                 vid.clone(),
305                 new Long JavaDoc(gossip_round),
306                 seqnos.clone(),
307                 local_addr};
308         } // synchronized(this)
309

310         call=new MethodCall("stability", params,
311             new String JavaDoc[] {ViewId.class.getName(), long.class.getName(), long[].class.getName(), Object JavaDoc.class.getName()});
312         callRemoteMethods(null, call, GroupRequest.GET_NONE, 0);
313     }
314
315
316     /**
317      * Contains the highest message sequence numbers (for each member) that
318      * can safely be deleted (because they have been seen by all members).
319      */

320     public void stability(ViewId view_id, long gossip_round, long[] stability_vector, Object JavaDoc sender) {
321         // i. Proceed to the next round; init the heard from list
322
// ii. Send up the stability vector
323
// iii. get a fresh copy of the highest deliverable seqnos
324
synchronized(this) {
325
326                 if(log.isInfoEnabled()) log.info("sender=" + sender + ", round=" + gossip_round + ", vector=" +
327                         Util.array2String(stability_vector) + ')');
328             if(vid == null || view_id == null || !vid.equals(view_id)) {
329
330                     if(log.isInfoEnabled()) log.info("view ID s are different (" + vid + " != " + view_id +
331                             "). Discarding gossip received");
332                 return;
333             }
334
335             if(round > gossip_round)
336                 return;
337             round=gossip_round + 1;
338             for(int i=0; i < heard_from.length; i++)
339                 heard_from[i]=false;
340         }
341         heard_from[mbrs.indexOf(local_addr)]=true;
342
343         passUp(new Event(Event.STABLE, stability_vector));
344         getHighestSeqnos();
345     }
346
347     /* --------------------- End of Request handler methods --------------- */
348
349     /**
350      * <b>Callback</b>. Called by superclass when event may be handled.
351      * <p>
352      * <b>Do not use <code>PassUp</code> in this method as the event is passed
353      * up by default by the superclass after this method returns !</b>
354      *
355      * @return boolean Defaults to true. If false, event will not be passed
356      * up the stack.
357      */

358     public boolean handleUpEvent(Event evt) {
359         switch(evt.getType()) {
360             case Event.MSG:
361                 if(!upMsg(evt))
362                     return (false);
363                 break;
364             case Event.SET_LOCAL_ADDRESS:
365                 local_addr=(Address)evt.getArg();
366                 break;
367         }
368
369         return true;
370     }
371
372
373     /**
374      * <b>Callback</b>. Called by superclass when event may be handled.
375      * <p>
376      * <b>Do not use <code>PassDown</code> in this method as the event is
377      * passed down by default by the superclass after this method returns !</b>
378      *
379      * @return boolean Defaults to true. If false, event will not be passed
380      * down the stack.
381      */

382     public boolean handleDownEvent(Event evt) {
383         switch(evt.getType()) {
384             case Event.VIEW_CHANGE:
385                 if(!downViewChange(evt))
386                     return (false);
387                 break;
388                 // does anyone else below needs this msg except STABLE?
389
case Event.GET_MSGS_RECEIVED_OK:
390                 if(!downGetMsgsReceived(evt))
391                     return (false);
392                 break;
393         }
394
395         return (true);
396     }
397
398
399     /**
400      * The gossip task that runs periodically
401      */

402     private void gossipRun() {
403         num_msgs=max_msgs;
404         sendGossip();
405     }
406
407
408     /**
409      * <pre>
410      * Reset the state of msg garbage-collection:
411      * i. Reset the table of highest seqnos seen by each member
412      * ii. Reset the tbl of mbrs from which highest seqnos have been recorded
413      * </pre>
414      */

415     private void initialize() {
416         synchronized(this) {
417             seqnos=new long[mbrs.size()];
418             for(int i=0; i < seqnos.length; i++)
419                 seqnos[i]=-1;
420
421             heard_from=new boolean[mbrs.size()];
422             for(int i=0; i < heard_from.length; i++)
423                 heard_from[i]=false;
424         }
425     }
426
427
428     /**
429      * (1)<br>
430      * Merge this member's table of highest seqnos seen by a each member
431      * with the one received from a gossip by another member. The result is
432      * the element-wise minimum of the input arrays. For each entry:<br>
433      *
434      * <tt>seqno[mbr_i] = min(seqno[mbr_i], gossip_seqno[mbr_i])</tt>
435      * <p>
436      *
437      * (2)<br>
438      * Merge the <tt>heard from</tt> tables of this member and the sender of
439      * the gossip. The resulting table is:<br>
440      *
441      * <tt>heard_from[mbr_i] = heard_from[mbr_i] | sender_heard[mbr_i]</tt>
442      *
443      * @param sender the sender of the gossip
444      * @param gossip_seqnos the highest deliverable seqnos of the sender
445      * @param gossip_heard_from the table of members sender has heard from
446      *
447      */

448     private void update(Object JavaDoc sender, long[] gossip_seqnos,
449                         boolean[] gossip_heard_from) {
450         int index;
451
452         synchronized(this) {
453             index=mbrs.indexOf(sender);
454             if(index < 0) {
455                  if(log.isWarnEnabled()) log.warn("sender " + sender + " not found in mbrs !");
456                 return;
457             }
458
459             for(int i=0; i < gossip_seqnos.length; i++)
460                 seqnos[i]=Math.min(seqnos[i], gossip_seqnos[i]);
461
462             heard_from[index]=true;
463             for(int i=0; i < heard_from.length; i++)
464                 heard_from[i]=heard_from[i] | gossip_heard_from[i];
465         }
466     }
467
468
469     /**
470      * Set the seqnos and heard_from arrays to those of the sender. The
471      * method is called when the sender seems to know more than this member.
472      * The situation occurs if either:
473      * <ul>
474      * <li>
475      * sender.heard_from > this.heard_from, i.e. the sender has heard
476      * from more members than we have</li>
477      * <li>
478      * sender.round > this.round, i.e. the sender is in a more recent round
479      * than we are</li>
480      * </ul>
481      *
482      * In both cases, this member is assigned the state of the sender
483      */

484     private void set(Object JavaDoc sender, long[] gossip_seqnos,
485                      boolean[] gossip_heard_from) {
486         int index;
487
488         synchronized(this) {
489             index=mbrs.indexOf(sender);
490             if(index < 0) {
491                  if(log.isWarnEnabled()) log.warn("sender " + sender + " not found in mbrs !");
492                 return;
493             }
494
495             seqnos=gossip_seqnos;
496             heard_from=gossip_heard_from;
497         }
498     }
499
500
501     /**
502      * @return true, if we have received the highest deliverable seqnos
503      * directly or indirectly from all members
504      */

505     private boolean heardFromAll() {
506         synchronized(this) {
507             if(heard_from == null) return false;
508             for(int i=0; i < heard_from.length; i++)
509                 if(!heard_from[i])
510                     return false;
511         }
512
513         return true;
514     }
515
516
517     /**
518      * Send our <code>seqnos</code> array to a subset of the membership
519      */

520     private void sendGossip() {
521         Vector JavaDoc gossip_subset;
522         Object JavaDoc[] params;
523         MethodCall call;
524
525         synchronized(this) {
526             gossip_subset=Util.pickSubset(mbrs, subset);
527             if(gossip_subset == null || gossip_subset.size() < 1) {
528                  if(log.isWarnEnabled()) log.warn("picked empty subset !");
529                 return;
530             }
531
532
533                 if(log.isInfoEnabled()) log.info("subset=" + gossip_subset + ", round=" + round + ", seqnos=" +
534                         Util.array2String(seqnos));
535
536             params=new Object JavaDoc[]{
537                 vid.clone(),
538                 new Long JavaDoc(round),
539                 seqnos.clone(),
540                 heard_from.clone(),
541                 local_addr};
542         }
543
544         call=new MethodCall("gossip", params,
545             new String JavaDoc[] {ViewId.class.getName(), long.class.getName(), long[].class.getName(), boolean[].class.getName(), Object JavaDoc.class.getName()});
546         for(int i=0; i < gossip_subset.size(); i++) {
547             try {
548                 callRemoteMethod((Address)gossip_subset.get(i), call, GroupRequest.GET_NONE, 0);
549             }
550             catch(Exception JavaDoc e) {
551                  if(log.isDebugEnabled()) log.debug("exception=" + e);
552             }
553         }
554     }
555
556
557     /**
558      * Sends GET_MSGS_RECEIVED to NAKACK layer (above us !) and stores result
559      * in <code>seqnos</code>. In case <code>seqnos</code> does not yet exist
560      * it creates and initializes it.
561      */

562     private void getHighestSeqnos() {
563         synchronized(highest_seqnos_mutex) {
564             passUp(new Event(Event.GET_MSGS_RECEIVED));
565
566             try {
567                 highest_seqnos_mutex.wait(highest_seqnos_timeout);
568             }
569             catch(InterruptedException JavaDoc e) {
570
571                     if(log.isErrorEnabled()) log.error("Interrupted while waiting for highest seqnos from NAKACK");
572             }
573         }
574     }
575
576
577     /**
578      * Start scheduling the gossip task
579      */

580     private void startGossip() {
581         synchronized(this) {
582             if(gossip_task != null)
583                 gossip_task.cancel();
584             gossip_task=new Task(new Times(new long[]{GOSSIP_INTERVAL}));
585             sched.add(gossip_task);
586         }
587     }
588
589
590     /**
591      * Received a <tt>MSG</tt> event from a layer below
592      *
593      * A msg received:
594      * If unicast ignore; if multicast and time for gossiping has been
595      * reached, send out a gossip to a subset of the mbrs
596      *
597      * @return true if the event should be forwarded to the layer above
598      */

599     private boolean upMsg(Event e) {
600         Message msg=(Message)e.getArg();
601
602         if(msg.getDest() != null && (!msg.getDest().isMulticastAddress()))
603             return (true);
604
605         synchronized(this) {
606             --num_msgs;
607             if(num_msgs > 0)
608                 return (true);
609             num_msgs=max_msgs;
610
611             gossip_task.cancel();
612             gossip_task=new Task(new Times(new long[]{0, GOSSIP_INTERVAL}));
613             sched.add(gossip_task);
614         }
615
616         return (true);
617     }
618
619
620     /**
621      * Received a <tt>VIEW_CHANGE</tt> event from a layer above
622      *
623      * A new view:
624      * i. Set the new mbrs list and the new view ID.
625      * ii. Reset the highest deliverable seqnos seen
626      *
627      * @return true if the event should be forwarded to the layer below
628      */

629     private boolean downViewChange(Event e) {
630         View v=(View)e.getArg();
631         Vector JavaDoc new_mbrs=v.getMembers();
632
633         /*
634           // Could this ever happen? GMS is always sending non-null value
635           if(new_mbrs == null) {
636           / Trace.println(
637           "STABLE.handleDownEvent()", Trace.ERROR,
638           "Received VIEW_CHANGE event with null mbrs list");
639           break;
640           }
641         */

642
643         synchronized(this) {
644             vid=v.getVid();
645             mbrs.clear();
646             mbrs.addAll(new_mbrs);
647             initialize();
648         }
649
650         return (true);
651     }
652
653
654     /**
655      * Received a <tt>GET_MSGS__RECEIVED_OK</tt> event from a layer above
656      *
657      * Updated list of highest deliverable seqnos:
658      * i. Update the local copy of highest deliverable seqnos
659      *
660      * @return true if the event should be forwarded to the layer below
661      */

662     private boolean downGetMsgsReceived(Event e) {
663         long[] new_seqnos=(long[])e.getArg();
664
665         try {
666             synchronized(this) {
667                 if(new_seqnos == null)
668                     return (true);
669                 if(new_seqnos.length != seqnos.length) {
670
671                         if(log.isInfoEnabled()) log.info("GET_MSGS_RECEIVED: array of highest " +
672                                 "seqnos seen so far (received from NAKACK layer) " +
673                                 "has a different length (" + new_seqnos.length +
674                                 ") from 'seqnos' array (" + seqnos.length + ')');
675                     return (true);
676                 }
677                 System.arraycopy(new_seqnos, 0, seqnos, 0, seqnos.length);
678             }
679
680         }
681         finally {
682             synchronized(highest_seqnos_mutex) {
683                 highest_seqnos_mutex.notifyAll();
684             }
685         }
686
687         return (true);
688     }
689
690
691     /**
692      * Select next interval from list. Once the end of the list is reached,
693      * keep returning the last value. It would be sensible that list of
694      * times is in increasing order
695      */

696     private static class Times {
697         private int next=0;
698         private long[] times;
699
700         public Times(long[] times) {
701             if(times.length == 0)
702                 throw new IllegalArgumentException JavaDoc("times");
703             this.times=times;
704         }
705
706         public synchronized long next() {
707             if(next >= times.length)
708                 return (times[times.length - 1]);
709             else
710                 return (times[next++]);
711         }
712
713         public long[] times() {
714             return (times);
715         }
716
717         public synchronized void reset() {
718             next=0;
719         }
720     }
721
722
723     /**
724      * The gossiping task
725      */

726     private class Task implements TimeScheduler.Task {
727         private final Times intervals;
728         private boolean cancelled=false;
729
730         public Task(Times intervals) {
731             this.intervals=intervals;
732         }
733
734         public long nextInterval() {
735             return (intervals.next());
736         }
737
738         public boolean cancelled() {
739             return (cancelled);
740         }
741
742         public void cancel() {
743             cancelled=true;
744         }
745
746         public void run() {
747             gossipRun();
748         }
749     }
750 }
751
Popular Tags