1 3 package org.jgroups.protocols.pbcast; 4 5 6 import org.jgroups.*; 7 import org.jgroups.protocols.PingRsp; 8 import org.jgroups.util.Promise; 9 import org.jgroups.util.Util; 10 11 import java.util.*; 12 13 14 24 public class ClientGmsImpl extends GmsImpl { 25 private final Vector initial_mbrs=new Vector(11); 26 private boolean initial_mbrs_received=false; 27 private final Promise join_promise=new Promise(); 28 29 30 public ClientGmsImpl(GMS g) { 31 gms=g; 32 } 33 34 public void init() throws Exception { 35 super.init(); 36 synchronized(initial_mbrs) { 37 initial_mbrs.clear(); 38 initial_mbrs_received=false; 39 } 40 join_promise.reset(); 41 } 42 43 44 57 public void join(Address mbr) { 58 Address coord=null; 59 JoinRsp rsp=null; 60 Digest tmp_digest=null; 61 leaving=false; 62 63 join_promise.reset(); 64 while(!leaving) { 65 findInitialMembers(); 66 if(log.isDebugEnabled()) log.debug("initial_mbrs are " + initial_mbrs); 67 if(initial_mbrs.size() == 0) { 68 if(gms.disable_initial_coord) { 69 if(log.isTraceEnabled()) 70 log.trace("received an initial membership of 0, but cannot become coordinator " + 71 "(disable_initial_coord=true), will retry fetching the initial membership"); 72 continue; 73 } 74 if(log.isDebugEnabled()) 75 log.debug("no initial members discovered: creating group as first member"); 76 becomeSingletonMember(mbr); 77 return; 78 } 79 80 coord=determineCoord(initial_mbrs); 81 if(coord == null) { if(log.isTraceEnabled()) 83 log.trace("could not determine coordinator from responses " + initial_mbrs); 84 85 Set clients=new TreeSet(); clients.add(mbr); for(int i=0; i < initial_mbrs.size(); i++) { 89 PingRsp pingRsp=(PingRsp)initial_mbrs.elementAt(i); 90 Address client_addr=pingRsp.getAddress(); 91 if(client_addr != null) 92 clients.add(client_addr); 93 } 94 if(log.isTraceEnabled()) 95 log.trace("clients to choose new coord from are: " + clients); 96 Address new_coord=(Address)clients.iterator().next(); 97 if(new_coord.equals(mbr)) { 98 if(log.isTraceEnabled()) 99 log.trace("I'm the first of the clients, will become singleton"); 100 becomeSingletonMember(mbr); 101 return; 102 } 103 else { 104 if(log.isTraceEnabled()) 105 log.trace("I'm not the first of the clients, waiting for another client to become coord"); 106 Util.sleep(500); 107 } 108 continue; 109 } 110 111 try { 112 if(log.isDebugEnabled()) 113 log.debug("sending handleJoin(" + mbr + ") to " + coord); 114 sendJoinMessage(coord, mbr); 115 rsp=(JoinRsp)join_promise.getResult(gms.join_timeout); 116 117 if(rsp == null) { 118 if(log.isWarnEnabled()) log.warn("join(" + mbr + ") failed, retrying"); 119 } 120 else { 121 tmp_digest=rsp.getDigest(); 123 if(tmp_digest != null) { 124 tmp_digest.incrementHighSeqno(coord); if(log.isDebugEnabled()) log.debug("digest is " + tmp_digest); 126 gms.setDigest(tmp_digest); 127 } 128 else 129 if(log.isErrorEnabled()) log.error("digest of JOIN response is null"); 130 131 if(log.isDebugEnabled()) log.debug("[" + gms.local_addr + "]: JoinRsp=" + rsp.getView() + 133 " [size=" + rsp.getView().size() + "]\n\n"); 134 135 if(rsp.getView() != null) { 136 if(!installView(rsp.getView())) { 137 if(log.isErrorEnabled()) log.error("view installation failed, retrying to join group"); 138 continue; 139 } 140 gms.passUp(new Event(Event.BECOME_SERVER)); 141 gms.passDown(new Event(Event.BECOME_SERVER)); 142 return; 143 } 144 else 145 if(log.isErrorEnabled()) log.error("view of JOIN response is null"); 146 } 147 } 148 catch(Exception e) { 149 if(log.isDebugEnabled()) log.debug("exception=" + e.toString() + ", retrying"); 150 } 151 152 Util.sleep(gms.join_retry_timeout); 153 } 154 } 155 156 157 public void leave(Address mbr) { 158 leaving=true; 159 wrongMethod("leave"); 160 } 161 162 163 public void handleJoinResponse(JoinRsp join_rsp) { 164 join_promise.setResult(join_rsp); } 166 167 public void handleLeaveResponse() { 168 ; } 170 171 172 public void suspect(Address mbr) { 173 ; 174 } 175 176 public void unsuspect(Address mbr) { 177 wrongMethod("unsuspect"); 178 } 179 180 181 public JoinRsp handleJoin(Address mbr) { 182 wrongMethod("handleJoin"); 183 return null; 184 } 185 186 187 188 public void handleLeave(Address mbr, boolean suspected) { 189 wrongMethod("handleLeave"); 190 } 191 192 193 196 public synchronized void handleViewChange(View new_view, Digest digest) { 197 if(log.isDebugEnabled()) log.debug("view " + new_view.getMembers() + 198 " is discarded as we are not a participant"); 199 } 200 201 202 206 private boolean installView(View new_view) { 207 Vector mems=new_view.getMembers(); 208 if(log.isDebugEnabled()) log.debug("new_view=" + new_view); 209 if(gms.local_addr == null || mems == null || !mems.contains(gms.local_addr)) { 210 if(log.isErrorEnabled()) log.error("I (" + gms.local_addr + 211 ") am not member of " + mems + ", will not install view"); 212 return false; 213 } 214 gms.installView(new_view); 215 gms.becomeParticipant(); 216 gms.passUp(new Event(Event.BECOME_SERVER)); 217 gms.passDown(new Event(Event.BECOME_SERVER)); 218 return true; 219 } 220 221 222 223 public void handleSuspect(Address mbr) { 224 wrongMethod("handleSuspect"); 225 return; 226 } 227 228 229 public boolean handleUpEvent(Event evt) { 230 Vector tmp; 231 232 switch(evt.getType()) { 233 234 case Event.FIND_INITIAL_MBRS_OK: 235 tmp=(Vector)evt.getArg(); 236 synchronized(initial_mbrs) { 237 if(tmp != null && tmp.size() > 0) { 238 initial_mbrs.addAll(tmp); 239 } 240 initial_mbrs_received=true; 241 initial_mbrs.notifyAll(); 242 } 243 return false; } 245 return true; 246 } 247 248 249 250 251 252 253 254 255 256 void sendJoinMessage(Address coord, Address mbr) { 257 Message msg; 258 GMS.GmsHeader hdr; 259 260 msg=new Message(coord, null, null); 261 hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_REQ, mbr); 262 msg.putHeader(gms.getName(), hdr); 263 gms.passDown(new Event(Event.MSG, msg)); 264 } 265 266 267 271 void findInitialMembers() { 272 PingRsp ping_rsp; 273 274 synchronized(initial_mbrs) { 275 initial_mbrs.removeAllElements(); 276 initial_mbrs_received=false; 277 gms.passDown(new Event(Event.FIND_INITIAL_MBRS)); 278 279 if(initial_mbrs_received == false) { 282 try { 283 initial_mbrs.wait(); 284 } 285 catch(Exception e) { 286 } 287 } 288 289 for(int i=0; i < initial_mbrs.size(); i++) { 290 ping_rsp=(PingRsp)initial_mbrs.elementAt(i); 291 if(ping_rsp.own_addr != null && gms.local_addr != null && 292 ping_rsp.own_addr.equals(gms.local_addr)) { 293 initial_mbrs.removeElementAt(i); 294 break; 295 } 296 } 297 } 298 } 299 300 301 305 Address determineCoord(Vector mbrs) { 306 PingRsp mbr; 307 Hashtable votes; 308 int count, most_votes; 309 Address winner=null, tmp; 310 311 if(mbrs == null || mbrs.size() < 1) 312 return null; 313 314 votes=new Hashtable(5); 315 316 for(int i=0; i < mbrs.size(); i++) { 318 mbr=(PingRsp)mbrs.elementAt(i); 319 if(mbr.is_server && mbr.coord_addr != null) { 320 if(!votes.containsKey(mbr.coord_addr)) 321 votes.put(mbr.coord_addr, new Integer (1)); 322 else { 323 count=((Integer )votes.get(mbr.coord_addr)).intValue(); 324 votes.put(mbr.coord_addr, new Integer (count + 1)); 325 } 326 } 327 } 328 329 if(log.isDebugEnabled()) { 330 if(votes.size() > 1) 331 if(log.isWarnEnabled()) log.warn("there was more than 1 candidate for coordinator: " + votes); 332 else 333 if(log.isDebugEnabled()) log.debug("election results: " + votes); 334 } 335 336 337 most_votes=0; 339 for(Enumeration e=votes.keys(); e.hasMoreElements();) { 340 tmp=(Address)e.nextElement(); 341 count=((Integer )votes.get(tmp)).intValue(); 342 if(count > most_votes) { 343 winner=tmp; 344 most_votes=count; 346 } 347 } 348 votes.clear(); 349 return winner; 350 } 351 352 353 void becomeSingletonMember(Address mbr) { 354 Digest initial_digest; 355 ViewId view_id=null; 356 Vector mbrs=new Vector(1); 357 358 initial_digest=new Digest(1); initial_digest.add(gms.local_addr, 0, 0); gms.setDigest(initial_digest); 362 363 view_id=new ViewId(mbr); mbrs.addElement(mbr); 365 gms.installView(new View(view_id, mbrs)); 366 gms.becomeCoordinator(); 368 gms.passUp(new Event(Event.BECOME_SERVER)); 369 gms.passDown(new Event(Event.BECOME_SERVER)); 370 if(log.isDebugEnabled()) log.debug("created group (first member). My view is " + gms.view_id + 371 ", impl is " + gms.getImpl().getClass().getName()); 372 } 373 374 375 } 376 | Popular Tags |