1 3 package org.jgroups.blocks; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Address; 9 import org.jgroups.Message; 10 import org.jgroups.Transport; 11 import org.jgroups.View; 12 import org.jgroups.util.Command; 13 import org.jgroups.util.RspList; 14 15 import java.util.Vector ; 16 17 18 19 20 44 public class GroupRequest implements RspCollector, Command { 45 46 public static final int GET_FIRST=1; 47 48 49 public static final int GET_ALL=2; 50 51 52 public static final int GET_MAJORITY=3; 53 54 55 public static final int GET_ABS_MAJORITY=4; 56 57 58 public static final int GET_N=5; 59 60 61 public static final int GET_NONE=6; 62 63 private static final short NOT_RECEIVED=0; 64 private static final short RECEIVED=1; 65 private static final short SUSPECTED=2; 66 67 private Address membership[]=null; private Object responses[]=null; private short received[]=null; 71 72 private final Vector suspects=new Vector (); 73 74 75 private final Vector members=new Vector (); 76 77 78 private final int max_suspects=40; 79 protected Message request_msg=null; 80 protected RequestCorrelator corr=null; protected Transport transport=null; 83 protected int rsp_mode=GET_ALL; 84 protected boolean done=false; 85 protected final Object rsp_mutex=new Object (); 86 protected long timeout=0; 87 protected int expected_mbrs=0; 88 89 protected static final Log log=LogFactory.getLog(GroupRequest.class); 90 91 92 private static long last_req_id=1; 93 94 protected long req_id=-1; 96 97 126 public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode) { 127 request_msg=m; 128 this.corr=corr; 129 this.rsp_mode=rsp_mode; 130 reset(members); 131 } 133 134 135 139 public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode, 140 long timeout, int expected_mbrs) { 141 this(m, corr, members, rsp_mode); 142 if(timeout > 0) 143 this.timeout=timeout; 144 this.expected_mbrs=expected_mbrs; 145 } 146 147 148 public GroupRequest(Message m, Transport transport, Vector members, int rsp_mode) { 149 request_msg=m; 150 this.transport=transport; 151 this.rsp_mode=rsp_mode; 152 reset(members); 153 } 155 156 157 161 public GroupRequest(Message m, Transport transport, Vector members, 162 int rsp_mode, long timeout, int expected_mbrs) { 163 this(m, transport, members, rsp_mode); 164 if(timeout > 0) 165 this.timeout=timeout; 166 this.expected_mbrs=expected_mbrs; 167 } 168 169 170 175 public boolean execute() { 176 boolean retval; 177 if(corr == null && transport == null) { 178 if(log.isErrorEnabled()) log.error("both corr and transport are null, cannot send group request"); 179 return false; 180 } 181 synchronized(rsp_mutex) { 182 done=false; 183 retval=doExecute(timeout); 184 if(retval == false && log.isTraceEnabled()) 185 log.trace("call did not execute correctly, request is " + toString()); 186 done=true; 187 return retval; 188 } 189 } 190 191 192 195 public void reset(Message m, int mode, long timeout) { 196 synchronized(rsp_mutex) { 197 done=false; 198 request_msg=m; 199 rsp_mode=mode; 200 this.timeout=timeout; 201 rsp_mutex.notifyAll(); 202 } 203 } 204 205 206 public void reset(Message m, final Vector members, int rsp_mode, long timeout, int expected_rsps) { 207 synchronized(rsp_mutex) { 208 reset(m, rsp_mode, timeout); 209 reset(members); 210 this.expected_mbrs=expected_rsps; 212 rsp_mutex.notifyAll(); 213 } 214 } 215 216 222 public void reset(Vector mbrs) { 223 if(mbrs != null) { 224 int size=mbrs.size(); 225 membership=new Address[size]; 226 responses=new Object [size]; 227 received=new short[size]; 228 for(int i=0; i < size; i++) { 229 membership[i]=(Address)mbrs.elementAt(i); 230 responses[i]=null; 231 received[i]=NOT_RECEIVED; 232 } 233 this.members.clear(); 235 this.members.addAll(mbrs); 236 } 237 else { 238 if(membership != null) { 239 for(int i=0; i < membership.length; i++) { 240 responses[i]=null; 241 received[i]=NOT_RECEIVED; 242 } 243 } 244 } 245 } 246 247 248 249 254 public void receiveResponse(Message m) { 255 Address sender=m.getSrc(), mbr; 256 Object val=null; 257 if(done) { 258 if(log.isWarnEnabled()) log.warn("command is done; cannot add response !"); 259 return; 260 } 261 if(suspects != null && suspects.size() > 0 && suspects.contains(sender)) { 262 if(log.isWarnEnabled()) log.warn("received response from suspected member " + sender + "; discarding"); 263 return; 264 } 265 if(m.getLength() > 0) { 266 try { 267 val=m.getObject(); 268 } 269 catch(Exception e) { 270 if(log.isErrorEnabled()) log.error("exception=" + e); 271 } 272 } 273 synchronized(rsp_mutex) { 274 for(int i=0; i < membership.length; i++) { 275 mbr=membership[i]; 276 if(mbr.equals(sender)) { 277 if(received[i] == NOT_RECEIVED) { 278 responses[i]=val; 279 received[i]=RECEIVED; 280 if(log.isTraceEnabled()) 281 log.trace("received response for request " + req_id + ", sender=" + sender + ", val=" + val); 282 rsp_mutex.notifyAll(); break; 284 } 285 } 286 } 287 } 288 } 290 291 292 298 public void suspect(Address suspected_member) { 299 Address mbr; 300 synchronized(rsp_mutex) { for(int i=0; i < membership.length; i++) { 302 mbr=membership[i]; 303 if(mbr.equals(suspected_member)) { 304 addSuspect(suspected_member); 305 responses[i]=null; 306 received[i]=SUSPECTED; 307 rsp_mutex.notifyAll(); 308 break; 309 } 310 } 311 } 312 } 314 315 316 333 public void viewChange(View new_view) { 334 Address mbr; 335 Vector mbrs=new_view != null? new_view.getMembers() : null; 336 if(membership == null || membership.length == 0 || mbrs == null) 337 return; 338 339 synchronized(rsp_mutex) { 340 this.members.clear(); 341 this.members.addAll(mbrs); 342 for(int i=0; i < membership.length; i++) { 343 mbr=membership[i]; 344 if(!mbrs.contains(mbr)) { 345 addSuspect(mbr); 346 responses[i]=null; 347 received[i]=SUSPECTED; 348 } 349 } 350 rsp_mutex.notifyAll(); 351 } 352 } 353 354 355 356 357 358 359 360 public RspList getResults() { 361 RspList retval=new RspList(); 362 Address sender; 363 synchronized(rsp_mutex) { 364 for(int i=0; i < membership.length; i++) { 365 sender=membership[i]; 366 switch(received[i]) { 367 case SUSPECTED: 368 retval.addSuspect(sender); 369 break; 370 case RECEIVED: 371 retval.addRsp(sender, responses[i]); 372 break; 373 case NOT_RECEIVED: 374 retval.addNotReceived(sender); 375 break; 376 } 377 } 378 return retval; 379 } 380 } 381 382 383 public String toString() { 384 StringBuffer ret=new StringBuffer (); 385 ret.append("[GroupRequest:\n"); 386 ret.append("req_id=").append(req_id).append('\n'); 387 ret.append("members: "); 388 for(int i=0; i < membership.length; i++) 389 ret.append(membership[i] + " "); 390 ret.append("\nresponses: "); 391 for(int i=0; i < responses.length; i++) 392 ret.append(responses[i] + " "); 393 ret.append("\nreceived: "); 394 for(int i=0; i < received.length; i++) 395 ret.append(receivedToString(received[i]) + " "); 396 if(suspects.size() > 0) 397 ret.append("\nsuspects: " + suspects); 398 ret.append("\nrequest_msg: " + request_msg); 399 ret.append("\nrsp_mode: " + rsp_mode); 400 ret.append("\ndone: " + done); 401 ret.append("\ntimeout: " + timeout); 402 ret.append("\nexpected_mbrs: " + expected_mbrs); 403 ret.append("\n]"); 404 return ret.toString(); 405 } 406 407 408 public int getNumSuspects() { 409 return suspects.size(); 410 } 411 412 413 public Vector getSuspects() { 414 return suspects; 415 } 416 417 418 public boolean isDone() { 419 return done; 420 } 421 422 423 424 425 426 protected int determineMajority(int i) { 427 return i < 2? i : (i / 2) + 1; 428 } 429 430 431 private static synchronized long getRequestId() { 432 long result=System.currentTimeMillis(); 433 if(result <= last_req_id) { 434 result=last_req_id + 1; 435 } 436 last_req_id=result; 437 return result; 438 } 439 440 441 protected boolean doExecute(long timeout) { 442 long start_time=0; 443 Address mbr, suspect; 444 req_id=getRequestId(); 445 reset(null); if(suspects != null) { for(int i=0; i < suspects.size(); i++) { 448 suspect=(Address)suspects.elementAt(i); 449 for(int j=0; j < membership.length; j++) { 450 mbr=membership[j]; 451 if(mbr.equals(suspect)) { 452 received[j]=SUSPECTED; 453 break; } 455 } 456 } 457 } 458 459 try { 460 if(log.isTraceEnabled()) log.trace("sending request (id=" + req_id + ')'); 461 if(corr != null) { 462 java.util.List tmp=members != null? members : null; 463 corr.sendRequest(req_id, tmp, request_msg, rsp_mode == GET_NONE? null : this); 464 } 465 else { 466 transport.send(request_msg); 467 } 468 } 469 catch(Throwable e) { 470 log.error("exception=" + e); 471 if(corr != null) { 472 corr.done(req_id); 473 } 474 return false; 475 } 476 477 if(timeout <= 0) { 478 while(true) { 479 adjustMembership(); if(getResponses()) { 481 if(corr != null) { 482 corr.done(req_id); 483 } 484 if(log.isTraceEnabled()) { 485 log.trace("received all responses: " + toString()); 486 } 487 return true; 488 } 489 try { 490 rsp_mutex.wait(); 491 } 492 catch(Exception e) { 493 } 494 } 495 } 496 else { 497 start_time=System.currentTimeMillis(); 498 while(timeout > 0) { 499 if(getResponses()) { 500 if(corr != null) 501 corr.done(req_id); 502 if(log.isTraceEnabled()) log.trace("received all responses: " + toString()); 503 return true; 504 } 505 timeout=timeout - (System.currentTimeMillis() - start_time); 506 if(timeout > 0) { 507 try { 508 rsp_mutex.wait(timeout); 509 } 510 catch(Exception e) { 511 } 513 } 514 } 515 if(corr != null) { 516 corr.done(req_id); 517 } 518 return false; 519 } 520 } 521 522 protected boolean getResponses() { 523 int num_not_received=getNum(NOT_RECEIVED); 524 int num_received=getNum(RECEIVED); 525 int num_suspected=getNum(SUSPECTED); 526 int num_total=membership.length; 527 int majority=determineMajority(num_total); 528 switch(rsp_mode) { 529 case GET_FIRST: 530 if(num_received > 0) 531 return true; 532 if(num_suspected >= num_total) 533 return true; 535 break; 536 case GET_ALL: 537 if(num_not_received > 0) 538 return false; 539 return true; 540 case GET_MAJORITY: 541 if(num_received + num_suspected >= majority) 542 return true; 543 break; 544 case GET_ABS_MAJORITY: 545 if(num_received >= majority) 546 return true; 547 break; 548 case GET_N: 549 if(expected_mbrs >= num_total) { 550 rsp_mode=GET_ALL; 551 return getResponses(); 552 } 553 if(num_received >= expected_mbrs) { 554 return true; 555 } 556 if(num_received + num_not_received < expected_mbrs) { 557 if(num_received + num_suspected >= expected_mbrs) { 558 return true; 559 } 560 return false; 561 } 562 return false; 563 case GET_NONE: 564 return true; 565 default : 566 if(log.isErrorEnabled()) log.error("rsp_mode " + rsp_mode + " unknown !"); 567 break; 568 } 569 return false; 570 } 571 572 574 int getNum(int type) { 575 int retval=0; 576 for(int i=0; i < received.length; i++) 577 if(received[i] == type) 578 retval++; 579 return retval; 580 } 581 582 583 584 private String receivedToString(int r) { 585 switch(r) { 586 case RECEIVED: 587 return "RECEIVED"; 588 case NOT_RECEIVED: 589 return "NOR_RECEIVED"; 590 case SUSPECTED: 591 return "SUSPECTED"; 592 default: 593 return "n/a"; 594 } 595 } 596 597 598 609 void adjustMembership() { 610 Address mbr; 611 if(membership == null || membership.length == 0) { 612 return; 614 } 615 for(int i=0; i < membership.length; i++) { 616 mbr=membership[i]; 617 if((this.members != null && !this.members.contains(mbr)) 618 || suspects.contains(mbr)) { 619 addSuspect(mbr); 620 responses[i]=null; 621 received[i]=SUSPECTED; 622 } 623 } 624 } 625 626 630 void addSuspect(Address suspected_mbr) { 631 if(!suspects.contains(suspected_mbr)) { 632 suspects.addElement(suspected_mbr); 633 while(suspects.size() >= max_suspects && suspects.size() > 0) 634 suspects.remove(0); } 636 } 637 } 638 | Popular Tags |