KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > GMS


1 // $Id: GMS.java,v 1.12 2005/03/17 19:47:22 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.blocks.GroupRequest;
8 import org.jgroups.blocks.MethodCall;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.stack.RpcProtocol;
11 import org.jgroups.util.Queue;
12 import org.jgroups.util.QueueClosedException;
13
14 import java.util.Hashtable JavaDoc;
15 import java.util.Properties JavaDoc;
16 import java.util.Vector JavaDoc;
17
18
19 /**
20  * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
21  * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
22  * any messages until they are members.
23  *
24  * @author Bela Ban
25  */

26 public class GMS extends RpcProtocol implements Runnable JavaDoc {
27     private GmsImpl impl=null;
28     public Address local_addr=null;
29     public String JavaDoc group_addr=null;
30     public final Membership members=new Membership();
31     public ViewId view_id=null;
32     public long ltime=0;
33     public long join_timeout=5000;
34     public long join_retry_timeout=2000;
35     private long flush_timeout=0; // 0=wait forever until FLUSH completes
36
private long rebroadcast_timeout=0; // 0=wait forever until REBROADCAST completes
37
private long view_change_timeout=10000; // until all handleViewChange() RPCs have returned
38
public long leave_timeout=5000;
39     public final Object JavaDoc impl_mutex=new Object JavaDoc(); // synchronizes event entry into impl
40
public final Object JavaDoc view_mutex=new Object JavaDoc(); // synchronizes view installations
41
private Queue event_queue=new Queue(); // stores SUSPECT, MERGE events
42
private Thread JavaDoc evt_thread=null;
43     private final Object JavaDoc flush_mutex=new Object JavaDoc();
44     private FlushRsp flush_rsp=null;
45     private final Object JavaDoc rebroadcast_mutex=new Object JavaDoc();
46     private boolean rebroadcast_unstable_msgs=true;
47     private boolean print_local_addr=true;
48     boolean disable_initial_coord=false; // can the member become a coord on startup or not ?
49
private final Hashtable JavaDoc impls=new Hashtable JavaDoc();
50     static final String JavaDoc CLIENT="Client";
51     static final String JavaDoc COORD="Coordinator";
52     static final String JavaDoc PART="Participant";
53
54
55     public GMS() {
56         initState();
57     }
58
59
60     public String JavaDoc getName() {
61         return "GMS";
62     }
63
64     public Vector JavaDoc requiredDownServices() {
65         Vector JavaDoc retval=new Vector JavaDoc();
66         retval.addElement(new Integer JavaDoc(Event.FLUSH));
67         retval.addElement(new Integer JavaDoc(Event.FIND_INITIAL_MBRS));
68         return retval;
69     }
70
71
72     public void setImpl(GmsImpl new_impl) {
73         synchronized(impl_mutex) {
74             impl=new_impl;
75              if(log.isInfoEnabled()) log.info("changed role to " + new_impl.getClass().getName());
76         }
77     }
78
79
80     public void start() throws Exception JavaDoc {
81         super.start();
82         if(checkForViewEnforcer(up_prot) == false) {
83             if(log.isWarnEnabled()) log.warn("I need protocol layer " +
84                     "VIEW_ENFORCER above me to discard messages sent to me while I'm " +
85                     "not yet a group member ! Otherwise, these messages will be delivered " +
86                     "to the application without checking...\n");
87         }
88
89         if(_corr != null)
90             _corr.setDeadlockDetection(true);
91         else
92             throw new Exception JavaDoc("GMS.start(): cannot set deadlock detection in corr, as it is null !");
93     }
94
95
96     public void becomeCoordinator() {
97         CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD);
98
99         if(tmp == null) {
100             tmp=new CoordGmsImpl(this);
101             tmp.leaving=false;
102             tmp.received_last_view=false; // +++ ?
103
impls.put(COORD, tmp);
104         }
105
106         setImpl(tmp);
107     }
108
109
110     public void becomeParticipant() {
111         ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART);
112
113         if(tmp == null) {
114             tmp=new ParticipantGmsImpl(this);
115             tmp.leaving=false;
116             tmp.received_final_view=false;
117             impls.put(PART, tmp);
118         }
119         setImpl(tmp);
120     }
121
122     public void becomeClient() {
123         ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT);
124
125         if(tmp == null) {
126             tmp=new ClientGmsImpl(this);
127             impls.put(CLIENT, tmp);
128         }
129         else
130             tmp.init();
131
132         setImpl(tmp);
133     }
134
135       boolean haveCoordinatorRole() {
136         return impl != null && impl instanceof CoordGmsImpl;
137     }
138
139
140     /**
141      * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
142      * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
143      */

144     public View getNextView(Vector JavaDoc new_mbrs, Vector JavaDoc old_mbrs, Vector JavaDoc suspected_mbrs) {
145         Vector JavaDoc mbrs;
146         long vid=0;
147         View v;
148         Membership tmp_mbrs;
149         Vector JavaDoc mbrs_to_remove=new Vector JavaDoc();
150
151         if(old_mbrs != null && old_mbrs.size() > 0)
152             for(int i=0; i < old_mbrs.size(); i++)
153                 mbrs_to_remove.addElement(old_mbrs.elementAt(i));
154         if(suspected_mbrs != null && suspected_mbrs.size() > 0)
155             for(int i=0; i < suspected_mbrs.size(); i++)
156                 if(!mbrs_to_remove.contains(suspected_mbrs.elementAt(i)))
157                     mbrs_to_remove.addElement(suspected_mbrs.elementAt(i));
158
159         synchronized(view_mutex) {
160             vid=Math.max(view_id.getId(), ltime) + 1;
161             ltime=vid;
162             tmp_mbrs=members.copy();
163             tmp_mbrs.merge(new_mbrs, mbrs_to_remove);
164             mbrs=(Vector JavaDoc)tmp_mbrs.getMembers().clone();
165             v=new View(local_addr, vid, mbrs);
166             return v;
167         }
168     }
169
170
171     /**
172      * Return a copy of the current membership minus the suspected members: FLUSH request is not sent
173      * to suspected members (because they won't respond, and not to joining members either.
174      * It IS sent to leaving members (before they are allowed to leave).
175      */

176     Vector JavaDoc computeFlushDestination(Vector JavaDoc suspected_mbrs) {
177         Vector JavaDoc ret=members.getMembers(); // *copy* of current membership
178
if(suspected_mbrs != null && suspected_mbrs.size() > 0)
179             for(int i=0; i < suspected_mbrs.size(); i++)
180                 ret.removeElement(suspected_mbrs.elementAt(i));
181         return ret;
182     }
183
184
185     /**
186      * Compute the destination set to which to send a VIEW_CHANGE message. This is the current
187      * members + the leaving members (old_mbrs) + the joining members (new_mbrs) - the suspected
188      * members.
189      */

190     Vector JavaDoc computeViewDestination(Vector JavaDoc new_mbrs, Vector JavaDoc old_mbrs, Vector JavaDoc suspected_mbrs) {
191         Vector JavaDoc ret=members.getMembers(); // **copy* of current membership
192
Address mbr;
193
194         // add new members
195
if(new_mbrs != null) {
196             for(int i=0; i < new_mbrs.size(); i++) {
197                 mbr=(Address)new_mbrs.elementAt(i);
198                 if(!ret.contains(mbr))
199                     ret.addElement(new_mbrs.elementAt(i));
200             }
201         }
202
203         // old members are still in existing membership, don't need to add them explicitely
204

205
206         // remove suspected members
207
if(suspected_mbrs != null) {
208             for(int i=0; i < suspected_mbrs.size(); i++) {
209                 mbr=(Address)suspected_mbrs.elementAt(i);
210                 ret.removeElement(suspected_mbrs.elementAt(i));
211             }
212         }
213         return ret;
214     }
215
216     /**
217      * FLUSH protocol.
218      * Send to current mbrs - suspected_mbrs (not including new_mbrs, but including old_mbr)
219      * Send TMP_VIEW event down,
220      * this allows FLUSH/NAKACK to set membership correctly
221      */

222
223     public void flush(Vector JavaDoc flush_dest, Vector JavaDoc suspected_mbrs) {
224         Vector JavaDoc rebroadcast_msgs=new Vector JavaDoc();
225
226         if(suspected_mbrs == null)
227             suspected_mbrs=new Vector JavaDoc();
228
229         while(flush_dest.size() > 0) {
230             flush_rsp=null;
231             synchronized(flush_mutex) {
232                 passDown(new Event(Event.FLUSH, flush_dest)); // send FLUSH to members in flush_dest
233
if(flush_rsp == null) {
234                     try {
235                         flush_mutex.wait(flush_timeout);
236                     }
237                     catch(Exception JavaDoc e) {
238                     }
239                 }
240             }
241             if(flush_rsp == null) {
242                 break;
243             }
244
245             if(rebroadcast_unstable_msgs && flush_rsp.unstable_msgs != null &&
246                     flush_rsp.unstable_msgs.size() > 0) {
247                 Message m;
248                 for(int i=0; i < flush_rsp.unstable_msgs.size(); i++) {
249                     m=(Message)flush_rsp.unstable_msgs.elementAt(i);
250
251                     // just add msg, NAKACK.RESEND will weed out duplicates based on
252
// <sender:id> before re-broadcasting msgs
253
rebroadcast_msgs.addElement(m);
254                 }
255             }
256
257             if(flush_rsp.result == true)
258                 break;
259             else {
260                 if(flush_rsp.failed_mbrs != null) {
261                     for(int i=0; i < flush_rsp.failed_mbrs.size(); i++) {
262                         flush_dest.removeElement(flush_rsp.failed_mbrs.elementAt(i));
263                         suspected_mbrs.addElement(flush_rsp.failed_mbrs.elementAt(i));
264                     }
265                 }
266             }
267         } // while
268
if(log.isInfoEnabled()) log.info("flushing completed.");
269
270
271         // Rebroadcast unstable messages
272
if(rebroadcast_unstable_msgs && rebroadcast_msgs.size() > 0) {
273
274                 if(log.isInfoEnabled()) log.info("re-broadcasting unstable messages (" +
275                         rebroadcast_msgs.size() + ')');
276             // NAKACK layer will rebroadcast the msgs (using the same seqnos assigned earlier)
277
synchronized(rebroadcast_mutex) {
278                 passDown(new Event(Event.REBROADCAST_MSGS, rebroadcast_msgs));
279                 try {
280                     rebroadcast_mutex.wait(rebroadcast_timeout);
281                 }
282                 catch(Exception JavaDoc e) {
283                 }
284             }
285              if(log.isInfoEnabled()) log.info("re-broadcasting messages completed");
286         }
287     }
288
289     /**
290      * Compute a new view, given the current view, the new members and the suspected/left
291      * members. Run view update protocol to install a new view in all members (this involves
292      * casting the new view to all members). The targets for FLUSH and VIEW mcasts are
293      * computed as follows:<p>
294      * <pre>
295      * existing leaving suspected joining
296      * <p/>
297      * 1. FLUSH y y n n
298      * 2. new_view y n n y
299      * 3. tmp_view y y n y
300      * (view_dest)
301      * </pre>
302      * <p/>
303      * <ol>
304      * <li>
305      * The FLUSH is only sent to the existing and leaving members (they are the only ones that might have
306      * old messages not yet seen by the group. The suspected members would not answer anyway (because they
307      * have failed) and the joining members have certainly no old messages.
308      * <li>
309      * The new view to be installed includes the existing members plus the joining ones and
310      * excludes the leaving and suspected members.
311      * <li>
312      * A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
313      * (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
314      * to the new view, leaving members are <em>included</em> since they have are waiting for a
315      * view in which they are not members any longer before they leave. So, if we did not set a
316      * temporary view, joining members would not receive the view (signalling that they have been
317      * joined successfully). The temporary view is essentially the current view plus the joining
318      * members (old members are still part of the current view).
319      * </ol>
320      */

321     public void castViewChange(Vector JavaDoc new_mbrs, Vector JavaDoc old_mbrs, Vector JavaDoc suspected_mbrs) {
322         View new_view, tmp_view;
323         ViewId new_vid;
324         Vector JavaDoc flush_dest=computeFlushDestination(suspected_mbrs); // members to which FLUSH/VIEW is sent
325
Vector JavaDoc view_dest=computeViewDestination(new_mbrs, old_mbrs, suspected_mbrs); // dest for view change
326

327         // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
328
new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs);
329         new_vid=new_view.getVid();
330
331         if(log.isInfoEnabled()) log.info("FLUSH phase, flush_dest: " + flush_dest +
332                                          "\n\tview_dest: " + view_dest + "\n\tnew_view: " + new_view + '\n');
333         flush(flush_dest, suspected_mbrs);
334         if(log.isInfoEnabled())
335             log.info("FLUSH phase done");
336
337         /* VIEW protocol. Send to current mbrs + new_mbrs + old_mbrs - suspected_mbrs. Since
338            suspected members were removed from view_dest during the previous FLUSH round(s), we
339            only need to add the new members. Send TMP_VIEW event down, this allows
340            FLUSH/NAKACK to set membership correctly */

341         view_dest=computeViewDestination(new_mbrs, old_mbrs, suspected_mbrs);
342         tmp_view=new View(null, view_dest);
343
344         Event view_event=new Event(Event.TMP_VIEW, tmp_view); // so the VIEW msg is sent to the correct mbrs
345
passDown(view_event); // needed e.g. by failure detector or UDP
346

347          if(log.isInfoEnabled()) log.info("mcasting view {" + new_vid + ", " + view_dest + '}');
348         passDown(new Event(Event.SWITCH_NAK_ACK)); // use ACK scheme for view bcast
349
Object JavaDoc[] args=new Object JavaDoc[]{new_vid, new_view.getMembers() /* these are the mbrs in the new view */};
350         MethodCall call=new MethodCall("handleViewChange", args, new String JavaDoc[]{ViewId.class.getName(), Vector JavaDoc.class.getName()});
351         callRemoteMethods(view_dest, // send to all members in 'view_dest'
352
call,
353                 GroupRequest.GET_ALL, view_change_timeout);
354          if(log.isInfoEnabled()) log.info("mcasting view completed");
355         passDown(new Event(Event.SWITCH_NAK)); // back to normal NAKs ...
356
}
357
358
359     /**
360      * Assigns the new ltime. Installs view and view_id. Changes role to coordinator if necessary.
361      * Sends VIEW_CHANGE event up and down the stack.
362      */

363     public void installView(ViewId new_view, Vector JavaDoc mbrs) {
364         Object JavaDoc coord;
365         int rc;
366
367         synchronized(view_mutex) { // serialize access to views
368
ltime=Math.max(new_view.getId(), ltime); // compute Lamport logical time
369
if(log.isInfoEnabled()) log.info("received view change, vid=" + new_view);
370
371             /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
372                This ensures that messages sent in view V1 are only received by members of V1 */

373             if(checkSelfInclusion(mbrs) == false) {
374                 if(log.isWarnEnabled()) log.warn("I'm not member of " + mbrs + ", discarding");
375                 return;
376             }
377
378
379             if(view_id == null) {
380                 if(new_view == null) {
381                     if(log.isErrorEnabled()) log.error("view_id and new_view are null !");
382                     return;
383                 }
384                 else { // view_id is null, new_view not: just install new_view (we're still a client)
385
view_id=(ViewId)new_view.clone();
386                 }
387             }
388             else {
389                 if(new_view == null) { // this should never happen though !
390
if(log.isErrorEnabled()) log.error("new_view is null !");
391                     return;
392                 }
393                 else { // both view_id and new_view are not null
394
rc=new_view.compareTo(view_id); // rc should always be a positive number
395
if(rc <= 0) { // don't accept view id lower than our own
396
{
397                             if(log.isWarnEnabled()) log.warn("received view <= current view; discarding it ! " +
398                                     "(view_id: " + view_id + ", new_view: " + new_view + ')');
399                         }
400                         return;
401                     }
402                     else { // the check for vid equality was okay, assign new_view to view_id
403

404                         if(new_view.getCoordAddress() != null) {
405                             view_id=new ViewId(new_view.getCoordAddress(), new_view.getId());
406                         }
407                         else {
408                             view_id=new ViewId(view_id.getCoordAddress(), new_view.getId());
409                         }
410                     }
411                 }
412             }
413
414             if(mbrs != null && mbrs.size() > 0)
415                 members.set(mbrs);
416
417
418
419             // Send VIEW_CHANGE event up and down the stack:
420
Event view_event=new Event(Event.VIEW_CHANGE, makeView(members.getMembers()));
421             passDown(view_event); // needed e.g. by failure detector or UDP
422
passUp(view_event);
423
424             coord=determineCoordinator();
425             if(coord != null && coord.equals(local_addr)) {
426                if (! haveCoordinatorRole()) // this avoids deadlock on coordinator - when suspect/join occurs simultaneously
427
becomeCoordinator();
428             }
429             else {
430                 if(haveCoordinatorRole() && !local_addr.equals(coord))
431                     becomeParticipant();
432             }
433         }
434     }
435
436
437     protected Address determineCoordinator() {
438         synchronized(members) {
439             return members != null && members.size() > 0? (Address)members.elementAt(0) : null;
440         }
441     }
442
443
444     /**
445      * Returns true if local_addr is member of mbrs, else false
446      */

447     protected boolean checkSelfInclusion(Vector JavaDoc mbrs) {
448         Object JavaDoc mbr;
449         if(mbrs == null)
450             return false;
451         for(int i=0; i < mbrs.size(); i++) {
452             mbr=mbrs.elementAt(i);
453             if(mbr != null && local_addr.equals(mbr))
454                 return true;
455         }
456         return false;
457     }
458
459
460     public View makeView(Vector JavaDoc mbrs) {
461         Address coord=null;
462         long id=0;
463
464         if(view_id != null) {
465             coord=view_id.getCoordAddress();
466             id=view_id.getId();
467         }
468         return new View(coord, id, mbrs);
469     }
470
471
472     public View makeView(Vector JavaDoc mbrs, ViewId vid) {
473         Address coord=null;
474         long id=0;
475
476         if(vid != null) {
477             coord=vid.getCoordAddress();
478             id=vid.getId();
479         }
480         return new View(coord, id, mbrs);
481     }
482
483
484
485
486     /* ------------------------- Request handler methods ----------------------------- */
487
488     public void join(Address mbr) {
489         synchronized(impl_mutex) {
490             impl.join(mbr);
491         }
492     }
493
494     public void leave(Address mbr) {
495         synchronized(impl_mutex) {
496             impl.leave(mbr);
497         }
498     }
499
500     public void suspect(Address mbr) {
501         synchronized(impl_mutex) {
502             impl.suspect(mbr);
503         }
504     }
505
506     public void merge(Vector JavaDoc other_coords) {
507         synchronized(impl_mutex) {
508             impl.merge(other_coords);
509         }
510     }
511
512     public boolean handleJoin(Address mbr) {
513         synchronized(impl_mutex) {
514             return impl.handleJoin(mbr);
515         }
516     }
517
518     public void handleLeave(Address mbr, boolean suspected) {
519         synchronized(impl_mutex) {
520             impl.handleLeave(mbr, suspected);
521         }
522     }
523
524     public void handleViewChange(ViewId new_view, Vector JavaDoc mbrs) {
525 // synchronized (impl_mutex ) {
526
impl.handleViewChange(new_view, mbrs);
527 // }
528
}
529
530     public View handleMerge(ViewId other_vid, Vector JavaDoc other_members) {
531         synchronized(impl_mutex) {
532             if(log.isTraceEnabled())
533             {
534                 View v=impl.handleMerge(other_vid, other_members);
535                 if(log.isInfoEnabled()) log.info("returning view: " + v);
536                 return v;
537             }
538             return impl.handleMerge(other_vid, other_members);
539         }
540     }
541
542     public void handleSuspect(Address mbr) {
543         synchronized(impl_mutex) {
544             impl.handleSuspect(mbr);
545         }
546     }
547
548     /* --------------------- End of Request handler methods -------------------------- */
549
550
551
552     boolean checkForViewEnforcer(Protocol up_protocol) {
553         String JavaDoc prot_name;
554
555         if(up_protocol == null)
556             return false;
557         prot_name=up_protocol.getName();
558         if(prot_name != null && "VIEW_ENFORCER".equals(prot_name))
559             return true;
560         return checkForViewEnforcer(up_protocol.getUpProtocol());
561     }
562
563
564     /**
565      * <b>Callback</b>. Called by superclass when event may be handled.<p>
566      * <b>Do not use <code>PassUp</code> in this method as the event is passed up
567      * by default by the superclass after this method returns !</b>
568      *
569      * @return boolean Defaults to true. If false, event will not be passed up the stack.
570      */

571     public boolean handleUpEvent(Event evt) {
572         switch(evt.getType()) {
573
574             case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this !
575
case Event.DISCONNECT_OK: // dito (e.g. sent by UDP layer)
576
return false;
577
578
579             case Event.SET_LOCAL_ADDRESS:
580                 local_addr=(Address)evt.getArg();
581
582                 if(print_local_addr) {
583                     System.out.println("\n-------------------------------------------------------\n" +
584                             "GMS: address is " + local_addr +
585                             "\n-------------------------------------------------------");
586                 }
587                 return true; // pass up
588

589             case Event.SUSPECT:
590                 try {
591                     event_queue.add(evt);
592                 }
593                 catch(Exception JavaDoc e) {
594                 }
595                 return true; // pass up
596

597             case Event.MERGE:
598                 try {
599                     event_queue.add(evt);
600                 }
601                 catch(Exception JavaDoc e) {
602                 }
603                 return false; // don't pass up
604

605
606             case Event.FLUSH_OK:
607                 synchronized(flush_mutex) {
608                     flush_rsp=(FlushRsp)evt.getArg();
609                     flush_mutex.notifyAll();
610                 }
611                 return false; // don't pass up
612

613             case Event.REBROADCAST_MSGS_OK:
614                 synchronized(rebroadcast_mutex) {
615                     rebroadcast_mutex.notifyAll();
616                 }
617                 return false; // don't pass up
618
}
619
620         return impl.handleUpEvent(evt);
621     }
622
623
624     /**
625      * <b>Callback</b>. Called by superclass when event may be handled.<p>
626      * <b>Do not use <code>PassDown</code> in this method as the event is passed down
627      * by default by the superclass after this method returns !</b>
628      *
629      * @return boolean Defaults to true. If false, event will not be passed down the stack.
630      */

631     public boolean handleDownEvent(Event evt) {
632         switch(evt.getType()) {
633
634             case Event.CONNECT:
635                 passDown(evt);
636                 try {
637                     group_addr=(String JavaDoc)evt.getArg();
638                 }
639                 catch(ClassCastException JavaDoc cce) {
640                     if(log.isErrorEnabled()) log.error("group address must " +
641                             "be a string (group name) to make sense");
642                 }
643                 impl.join(local_addr);
644                 passUp(new Event(Event.CONNECT_OK));
645                 startEventHandlerThread();
646                 return false; // don't pass down: was already passed down
647

648             case Event.DISCONNECT:
649                 impl.leave((Address)evt.getArg());
650                 passUp(new Event(Event.DISCONNECT_OK));
651                 stopEventHandlerThread();
652                 initState();
653                 return true; // pass down
654
}
655
656         return impl.handleDownEvent(evt);
657     }
658
659
660     // Priority handling, otherwise GMS.down(DISCONNECT) would block !
661
// Similar to FLUSH protocol
662
public void receiveDownEvent(Event evt) {
663         if(evt.getType() == Event.BLOCK_OK) {
664             passDown(evt);
665             return;
666         }
667         super.receiveDownEvent(evt);
668     }
669
670
671     /**
672      * Setup the Protocol instance acording to the configuration string
673      */

674     public boolean setProperties(Properties JavaDoc props) {
675         String JavaDoc str;
676
677         super.setProperties(props);
678         str=props.getProperty("join_timeout"); // time to wait for JOIN
679
if(str != null) {
680             join_timeout=Long.parseLong(str);
681             props.remove("join_timeout");
682         }
683
684         str=props.getProperty("print_local_addr");
685         if(str != null) {
686             print_local_addr=Boolean.valueOf(str).booleanValue();
687             props.remove("print_local_addr");
688         }
689
690         str=props.getProperty("view_change_timeout"); // time to wait for VIEW_CHANGE
691
if(str != null) {
692             view_change_timeout=Long.parseLong(str);
693             props.remove("view_change_timeout");
694         }
695
696         str=props.getProperty("join_retry_timeout"); // time to wait between JOINs
697
if(str != null) {
698             join_retry_timeout=Long.parseLong(str);
699             props.remove("join_retry_timeout");
700         }
701
702         str=props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req.
703
if(str != null) {
704             leave_timeout=Long.parseLong(str);
705             props.remove("leave_timeout");
706         }
707
708         str=props.getProperty("flush_timeout"); // time to wait until FLUSH completes (0=forever)
709
if(str != null) {
710             flush_timeout=Long.parseLong(str);
711             props.remove("flush_timeout");
712         }
713
714         str=props.getProperty("rebroadcast_unstable_msgs"); // bcast unstable msgs (recvd from FLUSH)
715
if(str != null) {
716             rebroadcast_unstable_msgs=Boolean.valueOf(str).booleanValue();
717             props.remove("rebroadcast_unstable_msgs");
718         }
719
720         str=props.getProperty("rebroadcast_timeout"); // time to wait until REBROADCAST_MSGS completes
721
if(str != null) {
722             rebroadcast_timeout=Long.parseLong(str);
723             props.remove("rebroadcast_timeout");
724         }
725
726         str=props.getProperty("disable_initial_coord"); // allow initial mbr to become coord or not
727
if(str != null) {
728             disable_initial_coord=Boolean.valueOf(str).booleanValue();
729             props.remove("disable_initial_coord");
730         }
731
732         if(props.size() > 0) {
733             System.err.println("GMS.setProperties(): the following properties are not recognized:");
734             props.list(System.out);
735             return false;
736         }
737         return true;
738     }
739
740
741     public void run() {
742         Event evt;
743
744         while(evt_thread != null && event_queue != null) {
745             try {
746                 evt=(Event)event_queue.remove();
747                 switch(evt.getType()) {
748                     case Event.SUSPECT:
749                         impl.suspect((Address)evt.getArg());
750                         break;
751                     case Event.MERGE:
752                         impl.merge((Vector JavaDoc)evt.getArg());
753                         break;
754                     default:
755                         if(log.isErrorEnabled()) log.error("event handler thread encountered event of type " +
756                                 Event.type2String(evt.getType()) + ": not handled by me !");
757                         break;
758                 }
759             }
760             catch(QueueClosedException closed) {
761                 break;
762             }
763             catch(Exception JavaDoc ex) {
764                 if(log.isWarnEnabled()) log.warn("exception=" + ex);
765             }
766         }
767     }
768
769
770
771     /* ------------------------------- Private Methods --------------------------------- */
772
773
774     void initState() {
775         becomeClient();
776         impl.init();
777         view_id=null;
778         if(members != null)
779             members.clear();
780     }
781
782
783     private void startEventHandlerThread() {
784         if(event_queue == null)
785             event_queue=new Queue();
786         if(evt_thread == null) {
787             evt_thread=new Thread JavaDoc(this, "GMS.EventHandlerThread");
788             evt_thread.setDaemon(true);
789             evt_thread.start();
790         }
791     }
792
793
794     private void stopEventHandlerThread() {
795         if(evt_thread != null) {
796             event_queue.close(false);
797             event_queue=null;
798             evt_thread=null;
799             return;
800         }
801
802         if(event_queue != null) {
803             event_queue.close(false);
804             event_queue=null;
805         }
806     }
807
808
809 }
810
Popular Tags