1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.Address; 7 import org.jgroups.Event; 8 import org.jgroups.View; 9 import org.jgroups.ViewId; 10 import org.jgroups.blocks.GroupRequest; 11 import org.jgroups.blocks.MethodCall; 12 import org.jgroups.util.Util; 13 14 import java.util.Enumeration ; 15 import java.util.Hashtable ; 16 import java.util.Vector ; 17 18 19 30 public class ClientGmsImpl extends GmsImpl { 31 final Vector initial_mbrs=new Vector (7); 32 final Object view_installation_mutex=new Object (); 33 boolean joined=false; 34 35 36 public ClientGmsImpl(GMS g) { 37 gms=g; 38 } 39 40 41 public void init() { 42 initial_mbrs.removeAllElements(); 43 joined=false; 44 } 45 46 47 59 public void join(Address mbr) { 60 Address coord=null; 61 Event view_evt; 62 63 while(!joined) { 64 findInitialMembers(); 65 if(joined) { 66 if(log.isInfoEnabled()) log.info("joined successfully"); 67 return; 68 } 69 if(initial_mbrs.size() == 0) { 70 if(gms.disable_initial_coord) { 71 72 if(log.isInfoEnabled()) log.info("received an initial membership of 0, but " + 73 "cannot become coordinator (disable_initial_coord=" + gms.disable_initial_coord + 74 "), will retry fetching the initial membership"); 75 continue; 76 } 77 joined=true; 78 gms.view_id=new ViewId(mbr); gms.members.add(mbr); 80 view_evt=new Event(Event.VIEW_CHANGE, 81 gms.makeView(gms.members.getMembers(), gms.view_id)); 82 gms.passDown(view_evt); 83 gms.passUp(view_evt); 84 gms.becomeCoordinator(); 85 86 gms.passUp(new Event(Event.BECOME_SERVER)); 87 gms.passDown(new Event(Event.BECOME_SERVER)); 88 if(log.isInfoEnabled()) log.info("created group (first member)"); 89 break; 90 } 91 92 coord=determineCoord(initial_mbrs); 93 if(coord == null) { 94 if(log.isWarnEnabled()) log.warn("could not determine coordinator " + 95 "from responses " + initial_mbrs); 96 continue; 97 } 98 99 synchronized(view_installation_mutex) { 100 try { 101 if(log.isInfoEnabled()) log.info("sending handleJoin() to " + coord); 102 MethodCall call=new MethodCall("handleJoin", new Object []{mbr}, new Class []{Address.class}); 103 gms.callRemoteMethod(coord, call, GroupRequest.GET_NONE, 0); 104 view_installation_mutex.wait(gms.join_timeout); } 106 catch(Exception e) { 107 if(log.isErrorEnabled()) log.error("exception is " + e); 108 continue; 109 } 110 } 112 if(joined) { 113 if(log.isInfoEnabled()) log.info("joined successfully"); 114 return; } 116 else { 117 if(log.isInfoEnabled()) log.info("failed, retrying"); 118 Util.sleep(gms.join_retry_timeout); 119 } 120 121 } } 123 124 125 public void leave(Address mbr) { 126 wrongMethod("leave"); 127 } 128 129 130 public void suspect(Address mbr) { 131 } 133 134 135 public void merge(Vector other_coords) { 136 wrongMethod("merge"); 137 } 138 139 140 public boolean handleJoin(Address mbr) { 141 wrongMethod("handleJoin"); 142 return false; 143 } 144 145 146 149 public void handleLeave(Address mbr, boolean suspected) { 150 wrongMethod("handleLeave"); 151 } 152 153 154 158 public void handleViewChange(ViewId new_view, Vector mems) { 159 if(gms.local_addr != null && mems != null && mems.contains(gms.local_addr)) { 160 synchronized(view_installation_mutex) { joined=true; 162 view_installation_mutex.notifyAll(); 163 gms.installView(new_view, mems); 164 gms.becomeParticipant(); 165 gms.passUp(new Event(Event.BECOME_SERVER)); 166 gms.passDown(new Event(Event.BECOME_SERVER)); 167 } 168 synchronized(initial_mbrs) { initial_mbrs.notifyAll(); } 171 } 172 else 173 if(log.isWarnEnabled()) log.warn("am not member of " + mems + ", will not install view"); 174 } 175 176 177 180 public View handleMerge(ViewId other_view, Vector other_members) { 181 wrongMethod("handleMerge"); 182 return null; 183 } 184 185 186 189 public void handleSuspect(Address mbr) { 190 wrongMethod("handleSuspect"); 191 return; 192 } 193 194 195 public boolean handleUpEvent(Event evt) { 196 Vector tmp; 197 198 switch(evt.getType()) { 199 200 case Event.FIND_INITIAL_MBRS_OK: 201 tmp=(Vector )evt.getArg(); 202 synchronized(initial_mbrs) { 203 if(tmp != null && tmp.size() > 0) 204 for(int i=0; i < tmp.size(); i++) 205 initial_mbrs.addElement(tmp.elementAt(i)); 206 initial_mbrs.notifyAll(); 207 } 208 return false; } 210 211 return true; 212 } 213 214 215 216 217 218 219 220 221 222 223 224 225 226 230 void findInitialMembers() { 231 PingRsp ping_rsp; 232 233 synchronized(initial_mbrs) { 234 initial_mbrs.removeAllElements(); 235 gms.passDown(Event.FIND_INITIAL_MBRS_EVT); 236 if(initial_mbrs.size() == 0) { 237 try { 238 initial_mbrs.wait(); 239 } 240 catch(Exception e) { 241 } 242 } 243 244 for(int i=0; i < initial_mbrs.size(); i++) { 245 ping_rsp=(PingRsp)initial_mbrs.elementAt(i); 246 if(ping_rsp.own_addr != null && gms.local_addr != null && 247 ping_rsp.own_addr.equals(gms.local_addr)) { 248 initial_mbrs.removeElementAt(i); 249 break; 250 } 251 } 252 } 253 } 254 255 256 260 Address determineCoord(Vector mbrs) { 261 PingRsp mbr; 262 Hashtable votes; 263 int count, most_votes; 264 Address winner=null, tmp; 265 266 if(mbrs == null || mbrs.size() < 1) 267 return null; 268 269 votes=new Hashtable (5); 270 271 for(int i=0; i < mbrs.size(); i++) { 273 mbr=(PingRsp)mbrs.elementAt(i); 274 if(mbr.coord_addr != null) { 275 if(!votes.containsKey(mbr.coord_addr)) 276 votes.put(mbr.coord_addr, new Integer (1)); 277 else { 278 count=((Integer )votes.get(mbr.coord_addr)).intValue(); 279 votes.put(mbr.coord_addr, new Integer (count + 1)); 280 } 281 } 282 } 283 284 { 285 if(votes.size() > 1) 286 if(log.isWarnEnabled()) log.warn("there was more than 1 candidate for coordinator: " + votes); 287 else 288 if(log.isInfoEnabled()) log.info("election results: " + votes); 289 } 290 291 292 most_votes=0; 294 for(Enumeration e=votes.keys(); e.hasMoreElements();) { 295 tmp=(Address)e.nextElement(); 296 count=((Integer )votes.get(tmp)).intValue(); 297 if(count > most_votes) { 298 winner=tmp; 299 most_votes=count; 301 } 302 } 303 votes.clear(); 304 return winner; 305 } 306 307 308 } 309 | Popular Tags |