KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > ClientGmsImpl


1 // $Id: ClientGmsImpl.java,v 1.18 2005/04/12 18:59:49 belaban Exp $
2

3 package org.jgroups.protocols.pbcast;
4
5
6 import org.jgroups.*;
7 import org.jgroups.protocols.PingRsp;
8 import org.jgroups.util.Promise;
9 import org.jgroups.util.Util;
10
11 import java.util.*;
12
13
14 /**
15  * Client part of GMS. Whenever a new member wants to join a group, it starts in the CLIENT role.
16  * No multicasts to the group will be received and processed until the member has been joined and
17  * turned into a SERVER (either coordinator or participant, mostly just participant). This class
18  * only implements <code>Join</code> (called by clients who want to join a certain group, and
19  * <code>ViewChange</code> which is called by the coordinator that was contacted by this client, to
20  * tell the client what its initial membership is.
21  * @author Bela Ban
22  * @version $Revision: 1.18 $
23  */

24 public class ClientGmsImpl extends GmsImpl {
25     private final Vector initial_mbrs=new Vector(11);
26     private boolean initial_mbrs_received=false;
27     private final Promise join_promise=new Promise();
28
29
30     public ClientGmsImpl(GMS g) {
31         gms=g;
32     }
33
34     public void init() throws Exception JavaDoc {
35         super.init();
36         synchronized(initial_mbrs) {
37             initial_mbrs.clear();
38             initial_mbrs_received=false;
39         }
40         join_promise.reset();
41     }
42
43
44     /**
45      * Joins this process to a group. Determines the coordinator and sends a unicast
46      * handleJoin() message to it. The coordinator returns a JoinRsp and then broadcasts the new view, which
47      * contains a message digest and the current membership (including the joiner). The joiner is then
48      * supposed to install the new view and the digest and starts accepting mcast messages. Previous
49      * mcast messages were discarded (this is done in PBCAST).<p>
50      * If successful, impl is changed to an instance of ParticipantGmsImpl.
51      * Otherwise, we continue trying to send join() messages to the coordinator,
52      * until we succeed (or there is no member in the group. In this case, we create our own singleton group).
53      * <p>When GMS.disable_initial_coord is set to true, then we won't become coordinator on receiving an initial
54      * membership of 0, but instead will retry (forever) until we get an initial membership of > 0.
55      * @param mbr Our own address (assigned through SET_LOCAL_ADDRESS)
56      */

57     public void join(Address mbr) {
58         Address coord=null;
59         JoinRsp rsp=null;
60         Digest tmp_digest=null;
61         leaving=false;
62
63         join_promise.reset();
64         while(!leaving) {
65             findInitialMembers();
66             if(log.isDebugEnabled()) log.debug("initial_mbrs are " + initial_mbrs);
67             if(initial_mbrs.size() == 0) {
68                 if(gms.disable_initial_coord) {
69                     if(log.isTraceEnabled())
70                         log.trace("received an initial membership of 0, but cannot become coordinator " +
71                                   "(disable_initial_coord=true), will retry fetching the initial membership");
72                     continue;
73                 }
74                 if(log.isDebugEnabled())
75                     log.debug("no initial members discovered: creating group as first member");
76                 becomeSingletonMember(mbr);
77                 return;
78             }
79
80             coord=determineCoord(initial_mbrs);
81             if(coord == null) { // e.g. because we have all clients only
82
if(log.isTraceEnabled())
83                     log.trace("could not determine coordinator from responses " + initial_mbrs);
84
85                 // so the member to become singleton member (and thus coord) is the first of all clients
86
Set clients=new TreeSet(); // sorted
87
clients.add(mbr); // add myself again (was removed by findInitialMembers())
88
for(int i=0; i < initial_mbrs.size(); i++) {
89                     PingRsp pingRsp=(PingRsp)initial_mbrs.elementAt(i);
90                     Address client_addr=pingRsp.getAddress();
91                     if(client_addr != null)
92                         clients.add(client_addr);
93                 }
94                 if(log.isTraceEnabled())
95                     log.trace("clients to choose new coord from are: " + clients);
96                 Address new_coord=(Address)clients.iterator().next();
97                 if(new_coord.equals(mbr)) {
98                     if(log.isTraceEnabled())
99                         log.trace("I'm the first of the clients, will become singleton");
100                     becomeSingletonMember(mbr);
101                     return;
102                 }
103                 else {
104                     if(log.isTraceEnabled())
105                         log.trace("I'm not the first of the clients, waiting for another client to become coord");
106                     Util.sleep(500);
107                 }
108                 continue;
109             }
110
111             try {
112                 if(log.isDebugEnabled())
113                     log.debug("sending handleJoin(" + mbr + ") to " + coord);
114                 sendJoinMessage(coord, mbr);
115                 rsp=(JoinRsp)join_promise.getResult(gms.join_timeout);
116
117                 if(rsp == null) {
118                     if(log.isWarnEnabled()) log.warn("join(" + mbr + ") failed, retrying");
119                 }
120                 else {
121                     // 1. Install digest
122
tmp_digest=rsp.getDigest();
123                     if(tmp_digest != null) {
124                         tmp_digest.incrementHighSeqno(coord); // see DESIGN for an explanantion
125
if(log.isDebugEnabled()) log.debug("digest is " + tmp_digest);
126                         gms.setDigest(tmp_digest);
127                     }
128                     else
129                         if(log.isErrorEnabled()) log.error("digest of JOIN response is null");
130
131                     // 2. Install view
132
if(log.isDebugEnabled()) log.debug("[" + gms.local_addr + "]: JoinRsp=" + rsp.getView() +
133                             " [size=" + rsp.getView().size() + "]\n\n");
134
135                     if(rsp.getView() != null) {
136                         if(!installView(rsp.getView())) {
137                             if(log.isErrorEnabled()) log.error("view installation failed, retrying to join group");
138                             continue;
139                         }
140                         gms.passUp(new Event(Event.BECOME_SERVER));
141                         gms.passDown(new Event(Event.BECOME_SERVER));
142                         return;
143                     }
144                     else
145                         if(log.isErrorEnabled()) log.error("view of JOIN response is null");
146                 }
147             }
148             catch(Exception JavaDoc e) {
149                 if(log.isDebugEnabled()) log.debug("exception=" + e.toString() + ", retrying");
150             }
151
152             Util.sleep(gms.join_retry_timeout);
153         }
154     }
155
156
157     public void leave(Address mbr) {
158         leaving=true;
159         wrongMethod("leave");
160     }
161
162
163     public void handleJoinResponse(JoinRsp join_rsp) {
164         join_promise.setResult(join_rsp); // will wake up join() method
165
}
166
167     public void handleLeaveResponse() {
168         ; // safely ignore this
169
}
170
171
172     public void suspect(Address mbr) {
173         ;
174     }
175
176     public void unsuspect(Address mbr) {
177         wrongMethod("unsuspect");
178     }
179
180
181     public JoinRsp handleJoin(Address mbr) {
182         wrongMethod("handleJoin");
183         return null;
184     }
185
186
187     /** Returns false. Clients don't handle leave() requests */
188     public void handleLeave(Address mbr, boolean suspected) {
189         wrongMethod("handleLeave");
190     }
191
192
193     /**
194      * Does nothing. Discards all views while still client.
195      */

196     public synchronized void handleViewChange(View new_view, Digest digest) {
197         if(log.isDebugEnabled()) log.debug("view " + new_view.getMembers() +
198                                            " is discarded as we are not a participant");
199     }
200
201
202     /**
203      * Called by join(). Installs the view returned by calling Coord.handleJoin() and
204      * becomes coordinator.
205      */

206     private boolean installView(View new_view) {
207         Vector mems=new_view.getMembers();
208          if(log.isDebugEnabled()) log.debug("new_view=" + new_view);
209         if(gms.local_addr == null || mems == null || !mems.contains(gms.local_addr)) {
210             if(log.isErrorEnabled()) log.error("I (" + gms.local_addr +
211                                                        ") am not member of " + mems + ", will not install view");
212             return false;
213         }
214         gms.installView(new_view);
215         gms.becomeParticipant();
216         gms.passUp(new Event(Event.BECOME_SERVER));
217         gms.passDown(new Event(Event.BECOME_SERVER));
218         return true;
219     }
220
221
222     /** Returns immediately. Clients don't handle suspect() requests */
223     public void handleSuspect(Address mbr) {
224         wrongMethod("handleSuspect");
225         return;
226     }
227
228
229     public boolean handleUpEvent(Event evt) {
230         Vector tmp;
231
232         switch(evt.getType()) {
233
234             case Event.FIND_INITIAL_MBRS_OK:
235                 tmp=(Vector)evt.getArg();
236                 synchronized(initial_mbrs) {
237                     if(tmp != null && tmp.size() > 0) {
238                         initial_mbrs.addAll(tmp);
239                     }
240                     initial_mbrs_received=true;
241                     initial_mbrs.notifyAll();
242                 }
243                 return false; // don't pass up the stack
244
}
245         return true;
246     }
247
248
249
250
251
252     /* --------------------------- Private Methods ------------------------------------ */
253
254
255
256     void sendJoinMessage(Address coord, Address mbr) {
257         Message msg;
258         GMS.GmsHeader hdr;
259
260         msg=new Message(coord, null, null);
261         hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_REQ, mbr);
262         msg.putHeader(gms.getName(), hdr);
263         gms.passDown(new Event(Event.MSG, msg));
264     }
265
266
267     /**
268      * Pings initial members. Removes self before returning vector of initial members.
269      * Uses IP multicast or gossiping, depending on parameters.
270      */

271     void findInitialMembers() {
272         PingRsp ping_rsp;
273
274         synchronized(initial_mbrs) {
275             initial_mbrs.removeAllElements();
276             initial_mbrs_received=false;
277             gms.passDown(new Event(Event.FIND_INITIAL_MBRS));
278
279             // the initial_mbrs_received flag is needed when passDown() is executed on the same thread, so when
280
// it returns, a response might actually have been received (even though the initial_mbrs might still be empty)
281
if(initial_mbrs_received == false) {
282                 try {
283                     initial_mbrs.wait();
284                 }
285                 catch(Exception JavaDoc e) {
286                 }
287             }
288
289             for(int i=0; i < initial_mbrs.size(); i++) {
290                 ping_rsp=(PingRsp)initial_mbrs.elementAt(i);
291                 if(ping_rsp.own_addr != null && gms.local_addr != null &&
292                         ping_rsp.own_addr.equals(gms.local_addr)) {
293                     initial_mbrs.removeElementAt(i);
294                     break;
295                 }
296             }
297         }
298     }
299
300
301     /**
302      The coordinator is determined by a majority vote. If there are an equal number of votes for
303      more than 1 candidate, we determine the winner randomly.
304      */

305     Address determineCoord(Vector mbrs) {
306         PingRsp mbr;
307         Hashtable votes;
308         int count, most_votes;
309         Address winner=null, tmp;
310
311         if(mbrs == null || mbrs.size() < 1)
312             return null;
313
314         votes=new Hashtable(5);
315
316         // count *all* the votes (unlike the 2000 election)
317
for(int i=0; i < mbrs.size(); i++) {
318             mbr=(PingRsp)mbrs.elementAt(i);
319             if(mbr.is_server && mbr.coord_addr != null) {
320                 if(!votes.containsKey(mbr.coord_addr))
321                     votes.put(mbr.coord_addr, new Integer JavaDoc(1));
322                 else {
323                     count=((Integer JavaDoc)votes.get(mbr.coord_addr)).intValue();
324                     votes.put(mbr.coord_addr, new Integer JavaDoc(count + 1));
325                 }
326             }
327         }
328
329         if(log.isDebugEnabled()) {
330             if(votes.size() > 1)
331                 if(log.isWarnEnabled()) log.warn("there was more than 1 candidate for coordinator: " + votes);
332                 else
333                     if(log.isDebugEnabled()) log.debug("election results: " + votes);
334         }
335
336
337         // determine who got the most votes
338
most_votes=0;
339         for(Enumeration e=votes.keys(); e.hasMoreElements();) {
340             tmp=(Address)e.nextElement();
341             count=((Integer JavaDoc)votes.get(tmp)).intValue();
342             if(count > most_votes) {
343                 winner=tmp;
344                 // fixed July 15 2003 (patch submitted by Darren Hobbs, patch-id=771418)
345
most_votes=count;
346             }
347         }
348         votes.clear();
349         return winner;
350     }
351
352
353     void becomeSingletonMember(Address mbr) {
354         Digest initial_digest;
355         ViewId view_id=null;
356         Vector mbrs=new Vector(1);
357
358         // set the initial digest (since I'm the first member)
359
initial_digest=new Digest(1); // 1 member (it's only me)
360
initial_digest.add(gms.local_addr, 0, 0); // initial seqno mcast by me will be 1 (highest seen +1)
361
gms.setDigest(initial_digest);
362
363         view_id=new ViewId(mbr); // create singleton view with mbr as only member
364
mbrs.addElement(mbr);
365         gms.installView(new View(view_id, mbrs));
366         gms.becomeCoordinator(); // not really necessary - installView() should do it
367

368         gms.passUp(new Event(Event.BECOME_SERVER));
369         gms.passDown(new Event(Event.BECOME_SERVER));
370         if(log.isDebugEnabled()) log.debug("created group (first member). My view is " + gms.view_id +
371                                            ", impl is " + gms.getImpl().getClass().getName());
372     }
373
374
375 }
376
Popular Tags