KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > GMS


1 // $Id: GMS.java,v 1.29 2005/04/20 20:25:44 belaban Exp $
2

3 package org.jgroups.protocols.pbcast;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.BoundedList;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Util;
11 import org.jgroups.util.Streamable;
12
13 import java.io.*;
14 import java.util.Hashtable JavaDoc;
15 import java.util.Iterator JavaDoc;
16 import java.util.Properties JavaDoc;
17 import java.util.Vector JavaDoc;
18
19
20
21
22 /**
23  * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
24  * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
25  * any messages until they are members.
26  */

27 public class GMS extends Protocol {
28     private GmsImpl impl=null;
29     Address local_addr=null;
30     final Membership members=new Membership(); // real membership
31
private final Membership tmp_members=new Membership(); // base for computing next view
32

33     /** Members joined but for which no view has been received yet */
34     private final Vector JavaDoc joining=new Vector JavaDoc(7);
35
36     /** Members excluded from group, but for which no view has been received yet */
37     private final Vector JavaDoc leaving=new Vector JavaDoc(7);
38
39     ViewId view_id=null;
40     private long ltime=0;
41     long join_timeout=5000;
42     long join_retry_timeout=2000;
43     long leave_timeout=5000;
44     private long digest_timeout=5000; // time to wait for a digest (from PBCAST). should be fast
45
long merge_timeout=10000; // time to wait for all MERGE_RSPS
46
private final Object JavaDoc impl_mutex=new Object JavaDoc(); // synchronizes event entry into impl
47
private final Object JavaDoc digest_mutex=new Object JavaDoc(); // synchronizes the GET_DIGEST/GET_DIGEST_OK events
48
private Digest digest=null; // holds result of GET_DIGEST event
49
private final Hashtable JavaDoc impls=new Hashtable JavaDoc(3);
50     private boolean shun=true;
51     boolean merge_leader=false; // can I initiate a merge ?
52
private boolean print_local_addr=true;
53     boolean disable_initial_coord=false; // can the member become a coord on startup or not ?
54
static final String JavaDoc CLIENT="Client";
55     static final String JavaDoc COORD="Coordinator";
56     static final String JavaDoc PART="Participant";
57     TimeScheduler timer=null;
58
59     /** Max number of old members to keep in history */
60     protected int num_prev_mbrs=50;
61
62     /** Keeps track of old members (up to num_prev_mbrs) */
63     BoundedList prev_members=null;
64
65     static final String JavaDoc name="GMS";
66
67
68
69     public GMS() {
70         initState();
71     }
72
73
74     public String JavaDoc getName() {
75         return name;
76     }
77
78
79     public Vector JavaDoc requiredDownServices() {
80         Vector JavaDoc retval=new Vector JavaDoc(3);
81         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST));
82         retval.addElement(new Integer JavaDoc(Event.SET_DIGEST));
83         retval.addElement(new Integer JavaDoc(Event.FIND_INITIAL_MBRS));
84         return retval;
85     }
86
87
88     public void setImpl(GmsImpl new_impl) {
89         synchronized(impl_mutex) {
90             impl=new_impl;
91             if(log.isDebugEnabled()) {
92                 String JavaDoc msg=(local_addr != null? local_addr.toString()+" " : "") + "changed role to " + new_impl.getClass().getName();
93                 log.debug(msg);
94             }
95         }
96     }
97
98
99     public GmsImpl getImpl() {
100         return impl;
101     }
102
103
104     public void init() throws Exception JavaDoc {
105         prev_members=new BoundedList(num_prev_mbrs);
106         timer=stack != null? stack.timer : null;
107         if(timer == null)
108             throw new Exception JavaDoc("GMS.init(): timer is null");
109         if(impl != null)
110             impl.init();
111     }
112
113     public void start() throws Exception JavaDoc {
114         if(impl != null) impl.start();
115     }
116
117     public void stop() {
118         if(impl != null) impl.stop();
119         if(prev_members != null)
120             prev_members.removeAll();
121     }
122
123
124     public void becomeCoordinator() {
125         CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD);
126         if(tmp == null) {
127             tmp=new CoordGmsImpl(this);
128             impls.put(COORD, tmp);
129         }
130         try {
131             tmp.init();
132         }
133         catch(Exception JavaDoc e) {
134             log.error("exception switching to coordinator role", e);
135         }
136         setImpl(tmp);
137     }
138
139
140     public void becomeParticipant() {
141         ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART);
142
143         if(tmp == null) {
144             tmp=new ParticipantGmsImpl(this);
145             impls.put(PART, tmp);
146         }
147         try {
148             tmp.init();
149         }
150         catch(Exception JavaDoc e) {
151             log.error("exception switching to participant", e);
152         }
153         setImpl(tmp);
154     }
155
156     public void becomeClient() {
157         ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT);
158         if(tmp == null) {
159             tmp=new ClientGmsImpl(this);
160             impls.put(CLIENT, tmp);
161         }
162         try {
163             tmp.init();
164         }
165         catch(Exception JavaDoc e) {
166             log.error("exception switching to client role", e);
167         }
168         setImpl(tmp);
169     }
170
171
172     boolean haveCoordinatorRole() {
173         return impl != null && impl instanceof CoordGmsImpl;
174     }
175
176
177     /**
178      * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
179      * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
180      */

181     public View getNextView(Vector JavaDoc new_mbrs, Vector JavaDoc old_mbrs, Vector JavaDoc suspected_mbrs) {
182         Vector JavaDoc mbrs;
183         long vid=0;
184         View v;
185         Membership tmp_mbrs=null;
186         Address tmp_mbr;
187
188         synchronized(members) {
189             if(view_id == null) {
190                 log.error("view_id is null");
191                 return null; // this should *never* happen !
192
}
193             vid=Math.max(view_id.getId(), ltime) + 1;
194             ltime=vid;
195             if(log.isDebugEnabled()) log.debug("VID=" + vid + ", current members=" +
196                     Util.printMembers(members.getMembers()) +
197                     ", new_mbrs=" + Util.printMembers(new_mbrs) +
198                     ", old_mbrs=" + Util.printMembers(old_mbrs) + ", suspected_mbrs=" +
199                     Util.printMembers(suspected_mbrs));
200
201             tmp_mbrs=tmp_members.copy(); // always operate on the temporary membership
202
tmp_mbrs.remove(suspected_mbrs);
203             tmp_mbrs.remove(old_mbrs);
204             tmp_mbrs.add(new_mbrs);
205             mbrs=tmp_mbrs.getMembers();
206             v=new View(local_addr, vid, mbrs);
207
208             // Update membership (see DESIGN for explanation):
209
tmp_members.set(mbrs);
210
211             // Update joining list (see DESIGN for explanation)
212
if(new_mbrs != null) {
213                 for(int i=0; i < new_mbrs.size(); i++) {
214                     tmp_mbr=(Address)new_mbrs.elementAt(i);
215                     if(!joining.contains(tmp_mbr))
216                         joining.addElement(tmp_mbr);
217                 }
218             }
219
220             // Update leaving list (see DESIGN for explanations)
221
if(old_mbrs != null) {
222                 for(Iterator JavaDoc it=old_mbrs.iterator(); it.hasNext();) {
223                     Address addr=(Address)it.next();
224                     if(!leaving.contains(addr))
225                         leaving.add(addr);
226                 }
227             }
228             if(suspected_mbrs != null) {
229                 for(Iterator JavaDoc it=suspected_mbrs.iterator(); it.hasNext();) {
230                     Address addr=(Address)it.next();
231                     if(!leaving.contains(addr))
232                         leaving.add(addr);
233                 }
234             }
235
236             if(log.isDebugEnabled()) log.debug("new view is " + v);
237             return v;
238         }
239     }
240
241
242     /**
243      Compute a new view, given the current view, the new members and the suspected/left
244      members. Then simply mcast the view to all members. This is different to the VS GMS protocol,
245      in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in
246      the current view before proceeding to install the next view.
247
248      The members for the new view are computed as follows:
249      <pre>
250      existing leaving suspected joining
251
252      1. new_view y n n y
253      2. tmp_view y y n y
254      (view_dest)
255      </pre>
256
257      <ol>
258      <li>
259      The new view to be installed includes the existing members plus the joining ones and
260      excludes the leaving and suspected members.
261      <li>
262      A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
263      (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
264      to the new view, leaving members are <em>included</em> since they have are waiting for a
265      view in which they are not members any longer before they leave. So, if we did not set a
266      temporary view, joining members would not receive the view (signalling that they have been
267      joined successfully). The temporary view is essentially the current view plus the joining
268      members (old members are still part of the current view).
269      </ol>
270      @return View The new view
271      */

272     public View castViewChange(Vector JavaDoc new_mbrs, Vector JavaDoc old_mbrs, Vector JavaDoc suspected_mbrs) {
273         View new_view;
274
275         // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
276
new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs);
277         castViewChange(new_view);
278         return new_view;
279     }
280
281
282     public void castViewChange(View new_view) {
283         castViewChange(new_view, null);
284     }
285
286
287     public void castViewChange(View new_view, Digest digest) {
288         Message view_change_msg;
289         GmsHeader hdr;
290
291         if(log.isDebugEnabled()) log.debug("mcasting view {" + new_view + "} (" + new_view.size() + " mbrs)\n");
292         view_change_msg=new Message(); // bcast to all members
293
hdr=new GmsHeader(GmsHeader.VIEW, new_view);
294         hdr.my_digest=digest;
295         view_change_msg.putHeader(name, hdr);
296         passDown(new Event(Event.MSG, view_change_msg));
297     }
298
299
300     /**
301      * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass
302      * of View), then digest will be non-null and has to be set before installing the view.
303      */

304     public void installView(View new_view, Digest digest) {
305         if(digest != null)
306             mergeDigest(digest);
307         installView(new_view);
308     }
309
310
311     /**
312      * Sets the new view and sends a VIEW_CHANGE event up and down the stack.
313      */

314     public void installView(View new_view) {
315         Address coord;
316         int rc;
317         ViewId vid=new_view.getVid();
318         Vector JavaDoc mbrs=new_view.getMembers();
319
320         if(log.isDebugEnabled()) log.debug("[local_addr=" + local_addr + "] view is " + new_view);
321
322         // Discards view with id lower than our own. Will be installed without check if first view
323
if(view_id != null) {
324             rc=vid.compareTo(view_id);
325             if(rc <= 0) {
326                 if(log.isDebugEnabled())
327                     log.debug("[" + local_addr + "] received view <= current view;" +
328                               " discarding it (current vid: " + view_id + ", new vid: " + vid + ')');
329                 return;
330             }
331         }
332
333         ltime=Math.max(vid.getId(), ltime); // compute Lamport logical time
334

335         /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
336         This ensures that messages sent in view V1 are only received by members of V1 */

337         if(checkSelfInclusion(mbrs) == false) {
338             if(log.isWarnEnabled()) log.warn("checkSelfInclusion() failed, " + local_addr +
339                     " is not a member of view " + new_view + "; discarding view");
340
341             // only shun if this member was previously part of the group. avoids problem where multiple
342
// members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view
343
// {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership
344
// bela Nov 20 2003
345
if(shun && local_addr != null && prev_members.contains(local_addr)) {
346                 if(log.isWarnEnabled())
347                     log.warn("I (" + local_addr + ") am being shunned, will leave and " +
348                             "rejoin group (prev_members are " + prev_members + ')');
349                 if(impl != null)
350                     impl.handleExit();
351                 passUp(new Event(Event.EXIT));
352             }
353             return;
354         }
355
356         synchronized(members) { // serialize access to views
357
// assign new_view to view_id
358
view_id=vid.copy();
359
360             // Set the membership. Take into account joining members
361
if(mbrs != null && mbrs.size() > 0) {
362                 members.set(mbrs);
363                 tmp_members.set(members);
364                 joining.removeAll(mbrs); // remove all members in mbrs from joining
365
// remove all elements from 'leaving' that are not in 'mbrs'
366
leaving.retainAll(mbrs);
367
368                 tmp_members.add(joining); // add members that haven't yet shown up in the membership
369
tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership
370

371                 // add to prev_members
372
for(Iterator JavaDoc it=mbrs.iterator(); it.hasNext();) {
373                     Address addr=(Address)it.next();
374                     if(!prev_members.contains(addr))
375                         prev_members.add(addr);
376                 }
377             }
378
379             // Send VIEW_CHANGE event up and down the stack:
380
Event view_event=new Event(Event.VIEW_CHANGE, new_view.clone());
381             passDown(view_event); // needed e.g. by failure detector or UDP
382
passUp(view_event);
383
384             coord=determineCoordinator();
385             // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) {
386
// changed on suggestion by yaronr and Nicolas Piedeloupe
387
if(coord != null && coord.equals(local_addr) && !haveCoordinatorRole()) {
388                 becomeCoordinator();
389             }
390             else {
391                 if(haveCoordinatorRole() && !local_addr.equals(coord))
392                     becomeParticipant();
393             }
394         }
395     }
396
397
398     protected Address determineCoordinator() {
399         synchronized(members) {
400             return members != null && members.size() > 0? (Address)members.elementAt(0) : null;
401         }
402     }
403
404
405     /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */
406     protected boolean wouldBeNewCoordinator(Address potential_new_coord) {
407         Address new_coord=null;
408
409         if(potential_new_coord == null) return false;
410
411         synchronized(members) {
412             if(members.size() < 2) return false;
413             new_coord=(Address)members.elementAt(1); // member at 2nd place
414
if(new_coord != null && new_coord.equals(potential_new_coord))
415                 return true;
416             return false;
417         }
418     }
419
420
421     /** Returns true if local_addr is member of mbrs, else false */
422     protected boolean checkSelfInclusion(Vector JavaDoc mbrs) {
423         Object JavaDoc mbr;
424         if(mbrs == null)
425             return false;
426         for(int i=0; i < mbrs.size(); i++) {
427             mbr=mbrs.elementAt(i);
428             if(mbr != null && local_addr.equals(mbr))
429                 return true;
430         }
431         return false;
432     }
433
434
435     public View makeView(Vector JavaDoc mbrs) {
436         Address coord=null;
437         long id=0;
438
439         if(view_id != null) {
440             coord=view_id.getCoordAddress();
441             id=view_id.getId();
442         }
443         return new View(coord, id, mbrs);
444     }
445
446
447     public View makeView(Vector JavaDoc mbrs, ViewId vid) {
448         Address coord=null;
449         long id=0;
450
451         if(vid != null) {
452             coord=vid.getCoordAddress();
453             id=vid.getId();
454         }
455         return new View(coord, id, mbrs);
456     }
457
458
459     /** Send down a SET_DIGEST event */
460     public void setDigest(Digest d) {
461         passDown(new Event(Event.SET_DIGEST, d));
462     }
463
464
465     /** Send down a MERGE_DIGEST event */
466     public void mergeDigest(Digest d) {
467         passDown(new Event(Event.MERGE_DIGEST, d));
468     }
469
470
471     /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or
472      timeout, whichever occurs first */

473     public Digest getDigest() {
474         Digest ret=null;
475
476         synchronized(digest_mutex) {
477             digest=null;
478             passDown(Event.GET_DIGEST_EVT);
479             if(digest == null) {
480                 try {
481                     digest_mutex.wait(digest_timeout);
482                 }
483                 catch(Exception JavaDoc ex) {
484                 }
485             }
486             if(digest != null) {
487                 ret=digest;
488                 digest=null;
489                 return ret;
490             }
491             else {
492                 if(log.isErrorEnabled()) log.error("digest could not be fetched from below");
493                 return null;
494             }
495         }
496     }
497
498
499     public void up(Event evt) {
500         Object JavaDoc obj;
501         Message msg;
502         GmsHeader hdr;
503         MergeData merge_data;
504
505         switch(evt.getType()) {
506
507             case Event.MSG:
508                 msg=(Message)evt.getArg();
509                 obj=msg.getHeader(name);
510                 if(obj == null || !(obj instanceof GmsHeader))
511                     break;
512                 hdr=(GmsHeader)msg.removeHeader(name);
513                 switch(hdr.type) {
514                     case GmsHeader.JOIN_REQ:
515                         handleJoinRequest(hdr.mbr);
516                         break;
517                     case GmsHeader.JOIN_RSP:
518                         impl.handleJoinResponse(hdr.join_rsp);
519                         break;
520                     case GmsHeader.LEAVE_REQ:
521                         if(log.isDebugEnabled()) log.debug("received LEAVE_REQ " + hdr + " from " + msg.getSrc());
522                         if(hdr.mbr == null) {
523                             if(log.isErrorEnabled()) log.error("LEAVE_REQ's mbr field is null");
524                             return;
525                         }
526                         // sendLeaveResponse(hdr.mbr);
527
impl.handleLeave(hdr.mbr, false);
528                         break;
529                     case GmsHeader.LEAVE_RSP:
530                         impl.handleLeaveResponse();
531                         break;
532                     case GmsHeader.VIEW:
533                         if(hdr.view == null) {
534                             if(log.isErrorEnabled()) log.error("[VIEW]: view == null");
535                             return;
536                         }
537                         impl.handleViewChange(hdr.view, hdr.my_digest);
538                         break;
539
540                     case GmsHeader.MERGE_REQ:
541                         impl.handleMergeRequest(msg.getSrc(), hdr.merge_id);
542                         break;
543
544                     case GmsHeader.MERGE_RSP:
545                         merge_data=new MergeData(msg.getSrc(), hdr.view, hdr.my_digest);
546                         merge_data.merge_rejected=hdr.merge_rejected;
547                         impl.handleMergeResponse(merge_data, hdr.merge_id);
548                         break;
549
550                     case GmsHeader.INSTALL_MERGE_VIEW:
551                         impl.handleMergeView(new MergeData(msg.getSrc(), hdr.view, hdr.my_digest), hdr.merge_id);
552                         break;
553
554                     case GmsHeader.CANCEL_MERGE:
555                         impl.handleMergeCancelled(hdr.merge_id);
556                         break;
557
558                     default:
559                         if(log.isErrorEnabled()) log.error("GmsHeader with type=" + hdr.type + " not known");
560                 }
561                 return; // don't pass up
562

563             case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this !
564
case Event.DISCONNECT_OK: // dito (e.g. sent by UDP layer). Don't send up the stack
565
return;
566
567
568             case Event.SET_LOCAL_ADDRESS:
569                 local_addr=(Address)evt.getArg();
570                 if(print_local_addr) {
571                     System.out.println("\n-------------------------------------------------------\n" +
572                                        "GMS: address is " + local_addr +
573                                        "\n-------------------------------------------------------");
574                 }
575                 break; // pass up
576

577             case Event.SUSPECT:
578                 impl.suspect((Address)evt.getArg());
579                 break; // pass up
580

581             case Event.UNSUSPECT:
582                 impl.unsuspect((Address)evt.getArg());
583                 return; // discard
584

585             case Event.MERGE:
586                 impl.merge((Vector JavaDoc)evt.getArg());
587                 return; // don't pass up
588
}
589
590         if(impl.handleUpEvent(evt))
591             passUp(evt);
592     }
593
594
595     /**
596      This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs
597      to retrieve the digest from the PBCAST layer. It therefore sends down a GET_DIGEST event, to which the PBCAST layer
598      responds with a GET_DIGEST_OK event.<p>
599      However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process
600      the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler
601      thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which
602      JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because
603      it won't be processed twice.
604      */

605     public void receiveUpEvent(Event evt) {
606         if(evt.getType() == Event.GET_DIGEST_OK) {
607             synchronized(digest_mutex) {
608                 digest=(Digest)evt.getArg();
609                 digest_mutex.notifyAll();
610             }
611             return;
612         }
613         super.receiveUpEvent(evt);
614     }
615
616
617     public void down(Event evt) {
618         switch(evt.getType()) {
619
620             case Event.CONNECT:
621                 passDown(evt);
622                 if(local_addr == null)
623                     if(log.isFatalEnabled()) log.fatal("[CONNECT] local_addr is null");
624                 impl.join(local_addr);
625                 passUp(new Event(Event.CONNECT_OK));
626                 return; // don't pass down: was already passed down
627

628             case Event.DISCONNECT:
629                 impl.leave((Address)evt.getArg());
630                 passUp(new Event(Event.DISCONNECT_OK));
631                 initState(); // in case connect() is called again
632
break; // pass down
633
}
634
635         if(impl.handleDownEvent(evt))
636             passDown(evt);
637     }
638
639
640     /** Setup the Protocol instance according to the configuration string */
641     public boolean setProperties(Properties JavaDoc props) {
642         String JavaDoc str;
643
644         super.setProperties(props);
645         str=props.getProperty("shun");
646         if(str != null) {
647             shun=Boolean.valueOf(str).booleanValue();
648             props.remove("shun");
649         }
650
651         str=props.getProperty("merge_leader");
652         if(str != null) {
653             merge_leader=Boolean.valueOf(str).booleanValue();
654             props.remove("merge_leader");
655         }
656
657         str=props.getProperty("print_local_addr");
658         if(str != null) {
659             print_local_addr=Boolean.valueOf(str).booleanValue();
660             props.remove("print_local_addr");
661         }
662
663         str=props.getProperty("join_timeout"); // time to wait for JOIN
664
if(str != null) {
665             join_timeout=Long.parseLong(str);
666             props.remove("join_timeout");
667         }
668
669         str=props.getProperty("join_retry_timeout"); // time to wait between JOINs
670
if(str != null) {
671             join_retry_timeout=Long.parseLong(str);
672             props.remove("join_retry_timeout");
673         }
674
675         str=props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req.
676
if(str != null) {
677             leave_timeout=Long.parseLong(str);
678             props.remove("leave_timeout");
679         }
680
681         str=props.getProperty("merge_timeout"); // time to wait for MERGE_RSPS from subgroup coordinators
682
if(str != null) {
683             merge_timeout=Long.parseLong(str);
684             props.remove("merge_timeout");
685         }
686
687         str=props.getProperty("digest_timeout"); // time to wait for GET_DIGEST_OK from PBCAST
688
if(str != null) {
689             digest_timeout=Long.parseLong(str);
690             props.remove("digest_timeout");
691         }
692
693         str=props.getProperty("disable_initial_coord");
694         if(str != null) {
695             disable_initial_coord=Boolean.valueOf(str).booleanValue();
696             props.remove("disable_initial_coord");
697         }
698
699         str=props.getProperty("num_prev_mbrs");
700         if(str != null) {
701             num_prev_mbrs=Integer.parseInt(str);
702             props.remove("num_prev_mbrs");
703         }
704
705         if(props.size() > 0) {
706             System.err.println("GMS.setProperties(): the following properties are not recognized:");
707             props.list(System.out);
708             return false;
709         }
710         return true;
711     }
712
713
714
715     /* ------------------------------- Private Methods --------------------------------- */
716
717     void initState() {
718         becomeClient();
719         view_id=null;
720     }
721
722
723     void handleJoinRequest(Address mbr) {
724         JoinRsp join_rsp;
725         Message m;
726         GmsHeader hdr;
727
728         if(mbr == null) {
729             if(log.isErrorEnabled()) log.error("mbr is null");
730             return;
731         }
732
733         if(log.isDebugEnabled()) log.debug("mbr=" + mbr);
734
735         // 1. Get the new view and digest
736
join_rsp=impl.handleJoin(mbr);
737         if(join_rsp == null)
738             if(log.isErrorEnabled())
739                 log.error(impl.getClass().toString() + ".handleJoin(" + mbr +
740                         ") returned null: will not be able to multicast new view");
741
742         // 2. Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest
743
// in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.
744
// Check NAKACK's TMP_VIEW handling for details
745
if(join_rsp != null && join_rsp.getView() != null)
746             passDown(new Event(Event.TMP_VIEW, join_rsp.getView()));
747
748         // 3. Return result to client
749
m=new Message(mbr, null, null);
750         hdr=new GmsHeader(GmsHeader.JOIN_RSP, join_rsp);
751         m.putHeader(name, hdr);
752         passDown(new Event(Event.MSG, m));
753
754         // 4. Bcast the new view
755
if(join_rsp != null)
756             castViewChange(join_rsp.getView());
757     }
758
759
760
761
762     /* --------------------------- End of Private Methods ------------------------------- */
763
764
765
766     public static class GmsHeader extends Header implements Streamable {
767         public static final int JOIN_REQ=1;
768         public static final int JOIN_RSP=2;
769         public static final int LEAVE_REQ=3;
770         public static final int LEAVE_RSP=4;
771         public static final int VIEW=5;
772         public static final int MERGE_REQ=6;
773         public static final int MERGE_RSP=7;
774         public static final int INSTALL_MERGE_VIEW=8;
775         public static final int CANCEL_MERGE=9;
776
777         int type=0;
778         View view=null; // used when type=VIEW or MERGE_RSP or INSTALL_MERGE_VIEW
779
Address mbr=null; // used when type=JOIN_REQ or LEAVE_REQ
780
JoinRsp join_rsp=null; // used when type=JOIN_RSP
781
Digest my_digest=null; // used when type=MERGE_RSP or INSTALL_MERGE_VIEW
782
Serializable merge_id=null; // used when type=MERGE_REQ or MERGE_RSP or INSTALL_MERGE_VIEW or CANCEL_MERGE
783
boolean merge_rejected=false; // used when type=MERGE_RSP
784

785
786         public GmsHeader() {
787         } // used for Externalization
788

789         public GmsHeader(int type) {
790             this.type=type;
791         }
792
793
794         /** Used for VIEW header */
795         public GmsHeader(int type, View view) {
796             this.type=type;
797             this.view=view;
798         }
799
800
801         /** Used for JOIN_REQ or LEAVE_REQ header */
802         public GmsHeader(int type, Address mbr) {
803             this.type=type;
804             this.mbr=mbr;
805         }
806
807         /** Used for JOIN_RSP header */
808         public GmsHeader(int type, JoinRsp join_rsp) {
809             this.type=type;
810             this.join_rsp=join_rsp;
811         }
812
813
814         public String JavaDoc toString() {
815             StringBuffer JavaDoc sb=new StringBuffer JavaDoc("GmsHeader");
816             sb.append('[' + type2String(type) + ']');
817             switch(type) {
818
819                 case JOIN_REQ:
820                     sb.append(": mbr=" + mbr);
821                     break;
822
823                 case JOIN_RSP:
824                     sb.append(": join_rsp=" + join_rsp);
825                     break;
826
827                 case LEAVE_REQ:
828                     sb.append(": mbr=" + mbr);
829                     break;
830
831                 case LEAVE_RSP:
832                     break;
833
834                 case VIEW:
835                     sb.append(": view=" + view);
836                     break;
837
838                 case MERGE_REQ:
839                     sb.append(": merge_id=" + merge_id);
840                     break;
841
842                 case MERGE_RSP:
843                     sb.append(": view=" + view + ", digest=" + my_digest + ", merge_rejected=" + merge_rejected +
844                               ", merge_id=" + merge_id);
845                     break;
846
847                 case INSTALL_MERGE_VIEW:
848                     sb.append(": view=" + view + ", digest=" + my_digest);
849                     break;
850
851                 case CANCEL_MERGE:
852                     sb.append(", <merge cancelled>, merge_id=" + merge_id);
853                     break;
854             }
855             return sb.toString();
856         }
857
858
859         public static String JavaDoc type2String(int type) {
860             switch(type) {
861                 case JOIN_REQ:
862                     return "JOIN_REQ";
863                 case JOIN_RSP:
864                     return "JOIN_RSP";
865                 case LEAVE_REQ:
866                     return "LEAVE_REQ";
867                 case LEAVE_RSP:
868                     return "LEAVE_RSP";
869                 case VIEW:
870                     return "VIEW";
871                 case MERGE_REQ:
872                     return "MERGE_REQ";
873                 case MERGE_RSP:
874                     return "MERGE_RSP";
875                 case INSTALL_MERGE_VIEW:
876                     return "INSTALL_MERGE_VIEW";
877                 case CANCEL_MERGE:
878                     return "CANCEL_MERGE";
879                 default:
880                     return "<unknown>";
881             }
882         }
883
884
885         public void writeExternal(ObjectOutput out) throws IOException {
886             out.writeInt(type);
887             out.writeObject(view);
888             out.writeObject(mbr);
889             out.writeObject(join_rsp);
890             out.writeObject(my_digest);
891             out.writeObject(merge_id);
892             out.writeBoolean(merge_rejected);
893         }
894
895
896         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
897             type=in.readInt();
898             view=(View)in.readObject();
899             mbr=(Address)in.readObject();
900             join_rsp=(JoinRsp)in.readObject();
901             my_digest=(Digest)in.readObject();
902             merge_id=(Serializable)in.readObject();
903             merge_rejected=in.readBoolean();
904         }
905
906
907         public void writeTo(DataOutputStream out) throws IOException {
908             out.writeInt(type);
909             Util.writeStreamable(view, out);
910             Util.writeAddress(mbr, out);
911             Util.writeStreamable(join_rsp, out);
912             Util.writeStreamable(my_digest, out);
913             Util.writeStreamable((Streamable)merge_id, out); // kludge: we know merge_id is a ViewId
914
out.writeBoolean(merge_rejected);
915         }
916
917         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
918             type=in.readInt();
919             view=(View)Util.readStreamable(View.class, in);
920             mbr=Util.readAddress(in);
921             join_rsp=(JoinRsp)Util.readStreamable(JoinRsp.class, in);
922             my_digest=(Digest)Util.readStreamable(Digest.class, in);
923             merge_id=(Serializable)Util.readStreamable(ViewId.class, in);
924             merge_rejected=in.readBoolean();
925         }
926
927     }
928
929
930 }
931
Popular Tags