1 3 package org.jgroups.protocols.pbcast; 4 5 6 import org.jgroups.*; 7 import org.jgroups.util.TimeScheduler; 8 9 import java.io.Serializable ; 10 import java.util.Iterator ; 11 import java.util.Vector ; 12 13 14 15 16 21 public class CoordGmsImpl extends GmsImpl { 22 private boolean merging=false; 23 private final MergeTask merge_task=new MergeTask(); 24 private final Vector merge_rsps=new Vector (11); 25 private Serializable merge_id=null; 27 28 private Address merge_leader=null; 29 30 private MergeCanceller merge_canceller=null; 31 32 33 34 public CoordGmsImpl(GMS g) { 35 gms=g; 36 } 37 38 39 void setMergeId(Serializable merge_id) { 40 this.merge_id=merge_id; 41 if(this.merge_id != null) { 42 stopMergeCanceller(); 43 merge_canceller=new MergeCanceller(this.merge_id, gms.merge_timeout); 44 gms.timer.add(merge_canceller); 45 } 46 else { stopMergeCanceller(); 48 } 49 } 50 51 private void stopMergeCanceller() { 52 if(merge_canceller != null) { 53 merge_canceller.cancel(); 54 merge_canceller=null; 55 } 56 } 57 58 public void init() throws Exception { 59 super.init(); 60 cancelMerge(); 61 } 62 63 public void join(Address mbr) { 64 wrongMethod("join"); 65 } 66 67 68 public void leave(Address mbr) { 69 if(mbr == null) { 70 if(log.isErrorEnabled()) log.error("member's address is null !"); 71 return; 72 } 73 if(mbr.equals(gms.local_addr)) 74 leaving=true; 75 handleLeave(mbr, false); } 77 78 public void handleJoinResponse(JoinRsp join_rsp) { 79 wrongMethod("handleJoinResponse"); 80 } 81 82 public void handleLeaveResponse() { 83 ; } 85 86 public void suspect(Address mbr) { 87 handleSuspect(mbr); 88 } 89 90 public void unsuspect(Address mbr) { 91 92 } 93 94 99 public void merge(Vector other_coords) { 100 Membership tmp; 101 102 if(merging) { 103 if(log.isWarnEnabled()) log.warn("merge already in progress, discarded MERGE event"); 104 return; 105 } 106 merge_leader=null; 107 if(other_coords == null) { 108 if(log.isWarnEnabled()) log.warn("list of other coordinators is null. Will not start merge."); 109 return; 110 } 111 112 if(other_coords.size() <= 1) { 113 if(log.isErrorEnabled()) 114 log.error("number of coordinators found is " + other_coords.size() + "; will not perform merge"); 115 return; 116 } 117 118 119 tmp=new Membership(other_coords); 120 tmp.sort(); 121 merge_leader=(Address)tmp.elementAt(0); 122 if(log.isDebugEnabled()) log.debug("coordinators in merge protocol are: " + tmp); 123 if(merge_leader.equals(gms.local_addr) || gms.merge_leader) { 124 if(log.isTraceEnabled()) 125 log.trace("I (" + gms.local_addr + ", merge_leader=" + gms.merge_leader + 126 ") will be the leader. Starting the merge task"); 127 startMergeTask(other_coords); 128 } 129 else { 130 if(log.isTraceEnabled()) log.trace("I (" + gms.local_addr + ") am not the merge leader (" + 131 merge_leader + "), waiting for merge leader to initiate merge"); 132 } 133 } 134 135 139 public void handleMergeRequest(Address sender, Object merge_id) { 140 Digest digest; 141 View view; 142 143 if(sender == null) { 144 if(log.isErrorEnabled()) log.error("sender == null; cannot send back a response"); 145 return; 146 } 147 if(merging) { 148 if(log.isErrorEnabled()) log.error("merge already in progress"); 149 sendMergeRejectedResponse(sender); 150 return; 151 } 152 merging=true; 153 setMergeId((Serializable )merge_id); 154 if(log.isDebugEnabled()) log.debug("sender=" + sender + ", merge_id=" + merge_id); 155 digest=gms.getDigest(); 156 view=new View(gms.view_id.copy(), gms.members.getMembers()); 157 sendMergeResponse(sender, view, digest); 158 } 159 160 161 MergeData getMergeResponse(Address sender, Object merge_id) { 162 Digest digest; 163 View view; 164 MergeData retval; 165 166 if(sender == null) { 167 if(log.isErrorEnabled()) log.error("sender == null; cannot send back a response"); 168 return null; 169 } 170 if(merging) { 171 if(log.isErrorEnabled()) log.error("merge already in progress"); 172 retval=new MergeData(sender, null, null); 173 retval.merge_rejected=true; 174 return retval; 175 } 176 merging=true; 177 setMergeId((Serializable )merge_id); 178 if(log.isDebugEnabled()) log.debug("sender=" + sender + ", merge_id=" + merge_id); 179 180 digest=gms.getDigest(); 181 view=new View(gms.view_id.copy(), gms.members.getMembers()); 182 retval=new MergeData(sender, view, digest); 183 retval.view=view; 184 retval.digest=digest; 185 return retval; 186 } 187 188 189 public void handleMergeResponse(MergeData data, Object merge_id) { 190 if(data == null) { 191 if(log.isErrorEnabled()) log.error("merge data is null"); 192 return; 193 } 194 if(merge_id == null || this.merge_id == null) { 195 if(log.isErrorEnabled()) log.error("merge_id (" 196 + merge_id 197 + ") or this.merge_id (" 198 + this.merge_id 199 + ") == null (sender=" 200 + data.getSender() 201 + ")."); 202 return; 203 } 204 205 if(!this.merge_id.equals(merge_id)) { 206 if(log.isErrorEnabled()) log.error("this.merge_id (" 207 + this.merge_id 208 + ") is different from merge_id (" 209 + merge_id 210 + ')'); 211 return; 212 } 213 214 synchronized(merge_rsps) { 215 if(!merge_rsps.contains(data)) { 216 merge_rsps.addElement(data); 217 merge_rsps.notifyAll(); 218 } 219 } 220 } 221 222 226 public void handleMergeView(MergeData data, Object merge_id) { 227 if(merge_id == null 228 || this.merge_id == null 229 || !this.merge_id.equals(merge_id)) { 230 if(log.isErrorEnabled()) log.error("merge_ids don't match (or are null); merge view discarded"); 231 return; 232 } 233 gms.castViewChange(data.view, data.digest); 234 merging=false; 235 merge_id=null; 236 } 237 238 public void handleMergeCancelled(Object merge_id) { 239 if(merge_id != null 240 && this.merge_id != null 241 && this.merge_id.equals(merge_id)) { 242 if(log.isDebugEnabled()) log.debug("merge was cancelled (merge_id=" + merge_id + ')'); 243 setMergeId(null); 244 this.merge_leader=null; 245 merging=false; 246 } 247 } 248 249 250 private void cancelMerge() { 251 if(merge_id != null && log.isDebugEnabled()) log.debug("cancelling merge (merge_id=" + merge_id + ')'); 252 setMergeId(null); 253 this.merge_leader=null; 254 stopMergeTask(); 255 merging=false; 256 synchronized(merge_rsps) { 257 merge_rsps.clear(); 258 } 259 } 260 261 265 public synchronized JoinRsp handleJoin(Address mbr) { 266 Vector new_mbrs=new Vector (1); 267 View v=null; 268 Digest d, tmp; 269 270 if(log.isDebugEnabled()) log.debug("mbr=" + mbr); 271 if(gms.local_addr.equals(mbr)) { 272 if(log.isErrorEnabled()) log.error("cannot join myself !"); 273 return null; 274 } 275 276 if(gms.members.contains(mbr)) { 277 if(log.isErrorEnabled()) 278 log.error("member " + mbr + " already present; returning existing view " + gms.members.getMembers()); 279 return new JoinRsp(new View(gms.view_id, gms.members.getMembers()), gms.getDigest()); 280 } 282 new_mbrs.addElement(mbr); 283 tmp=gms.getDigest(); if(tmp == null) { 285 if(log.isErrorEnabled()) log.error("received null digest from GET_DIGEST: will cause JOIN to fail"); 286 return null; 287 } 288 if(log.isDebugEnabled()) log.debug("got digest=" + tmp); 289 290 d=new Digest(tmp.size() + 1); 291 d.add(tmp); d.add(mbr, 0, 0); 294 v=gms.getNextView(new_mbrs, null, null); 296 if(log.isDebugEnabled()) log.debug("joined member " + mbr + ", view is " + v); 297 return new JoinRsp(v, d); 298 } 299 300 304 public synchronized void handleLeave(Address mbr, boolean suspected) { 305 Vector v=new Vector (1); 306 if(log.isDebugEnabled()) log.debug("mbr=" + mbr); 308 if(!gms.members.contains(mbr)) { 309 if(log.isErrorEnabled()) log.error("mbr " + mbr + " is not a member !"); 310 return; 311 } 312 313 if(gms.view_id == null) { 314 if(log.isDebugEnabled()) 317 log.debug("gms.view_id is null, I'm not the coordinator anymore (leaving=" + leaving + 318 "); the new coordinator will handle the leave request"); 319 return; 320 } 321 322 sendLeaveResponse(mbr); 324 v.addElement(mbr); 325 if(suspected) 326 gms.castViewChange(null, null, v); 327 else 328 gms.castViewChange(null, v, null); 329 } 330 331 void sendLeaveResponse(Address mbr) { 332 Message msg=new Message(mbr, null, null); 333 GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP); 334 msg.putHeader(gms.getName(), hdr); 335 gms.passDown(new Event(Event.MSG, msg)); 336 } 337 338 344 public void handleViewChange(View new_view, Digest digest) { 345 Vector mbrs=new_view.getMembers(); 346 if(log.isDebugEnabled()) { 347 if(digest != null) 348 log.debug("view=" + new_view + ", digest=" + digest); 349 else 350 log.debug("view=" + new_view); 351 } 352 353 if(leaving && !mbrs.contains(gms.local_addr)) 354 return; 355 gms.installView(new_view, digest); 356 } 357 358 public void handleSuspect(Address mbr) { 359 if(mbr.equals(gms.local_addr)) { 360 if(log.isWarnEnabled()) log.warn("I am the coord and I'm being am suspected -- will probably leave shortly"); 361 return; 362 } 363 handleLeave(mbr, true); } 365 366 public void handleExit() { 367 cancelMerge(); 368 } 369 370 public void stop() { 371 super.stop(); stopMergeTask(); 373 } 374 375 376 377 void startMergeTask(Vector coords) { 378 synchronized(merge_task) { 379 merge_task.start(coords); 380 } 381 } 382 383 void stopMergeTask() { 384 synchronized(merge_task) { 385 merge_task.stop(); 386 } 387 } 388 389 397 void getMergeDataFromSubgroupCoordinators(Vector coords, long timeout) { 398 Message msg; 399 GMS.GmsHeader hdr; 400 Address coord; 401 long curr_time, time_to_wait=0, end_time; 402 int num_rsps_expected=0; 403 404 if(coords == null || coords.size() <= 1) { 405 if(log.isErrorEnabled()) log.error("coords == null or size <= 1"); 406 return; 407 } 408 409 synchronized(merge_rsps) { 410 merge_rsps.removeAllElements(); 411 412 if(log.isDebugEnabled()) log.debug("sending MERGE_REQ to " + coords); 413 for(int i=0; i < coords.size(); i++) { 414 coord=(Address)coords.elementAt(i); 415 416 if(gms.local_addr != null && gms.local_addr.equals(coord)) { 417 merge_rsps.add(getMergeResponse(gms.local_addr, merge_id)); 418 continue; 419 } 420 421 msg=new Message(coord, null, null); 422 hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_REQ); 423 hdr.mbr=gms.local_addr; 424 hdr.merge_id=merge_id; 425 msg.putHeader(gms.getName(), hdr); 426 gms.passDown(new Event(Event.MSG, msg)); 427 } 428 429 num_rsps_expected=coords.size(); 431 curr_time=System.currentTimeMillis(); 432 end_time=curr_time + timeout; 433 while(end_time > curr_time) { 434 time_to_wait=end_time - curr_time; 435 if(log.isDebugEnabled()) log.debug("waiting " + time_to_wait + " msecs for merge responses"); 436 if(merge_rsps.size() < num_rsps_expected) { 437 try { 438 merge_rsps.wait(time_to_wait); 439 } 440 catch(Exception ex) { 441 } 442 } 443 if(log.isDebugEnabled()) 444 log.debug("num_rsps_expected=" + num_rsps_expected + ", actual responses=" + merge_rsps.size()); 445 446 if(merge_rsps.size() >= num_rsps_expected) 447 break; 448 curr_time=System.currentTimeMillis(); 449 } 450 } 451 } 452 453 456 Serializable generateMergeId() { 457 return new ViewId(gms.local_addr, System.currentTimeMillis()); 458 } 460 461 471 MergeData consolidateMergeData(Vector v) { 472 MergeData ret=null; 473 MergeData tmp_data; 474 long logical_time=0; ViewId new_vid, tmp_vid; 476 MergeView new_view; 477 View tmp_view; 478 Membership new_mbrs=new Membership(); 479 int num_mbrs=0; 480 Digest new_digest=null; 481 Address new_coord; 482 Vector subgroups=new Vector (11); 483 485 for(int i=0; i < v.size(); i++) { 486 tmp_data=(MergeData)v.elementAt(i); 487 if(log.isDebugEnabled()) log.debug("merge data is " + tmp_data); 488 tmp_view=tmp_data.getView(); 489 if(tmp_view != null) { 490 tmp_vid=tmp_view.getVid(); 491 if(tmp_vid != null) { 492 logical_time=Math.max(logical_time, tmp_vid.getId()); 494 } 495 new_mbrs.add(tmp_view.getMembers()); 497 subgroups.addElement(tmp_view.clone()); 498 } 499 } 500 501 new_mbrs.sort(); 503 num_mbrs=new_mbrs.size(); 504 new_coord=num_mbrs > 0? (Address)new_mbrs.elementAt(0) : null; 505 if(new_coord == null) { 506 if(log.isErrorEnabled()) log.error("new_coord == null"); 507 return null; 508 } 509 new_vid=new ViewId(new_coord, logical_time + 1); 511 512 new_view=new MergeView(new_vid, new_mbrs.getMembers(), subgroups); 514 if(log.isDebugEnabled()) log.debug("new merged view will be " + new_view); 515 516 new_digest=consolidateDigests(v, num_mbrs); 518 if(new_digest == null) { 519 if(log.isErrorEnabled()) log.error("digest could not be consolidated"); 520 return null; 521 } 522 if(log.isDebugEnabled()) log.debug("consolidated digest=" + new_digest); 523 ret=new MergeData(gms.local_addr, new_view, new_digest); 524 return ret; 525 } 526 527 531 Digest consolidateDigests(Vector v, int num_mbrs) { 532 MergeData data; 533 Digest tmp_digest, retval=new Digest(num_mbrs); 534 535 for(int i=0; i < v.size(); i++) { 536 data=(MergeData)v.elementAt(i); 537 tmp_digest=data.getDigest(); 538 if(tmp_digest == null) { 539 if(log.isErrorEnabled()) log.error("tmp_digest == null; skipping"); 540 continue; 541 } 542 retval.merge(tmp_digest); 543 } 544 return retval; 545 } 546 547 555 void sendMergeView(Vector coords, MergeData combined_merge_data) { 556 Message msg; 557 GMS.GmsHeader hdr; 558 Address coord; 559 View v; 560 Digest d; 561 562 if(coords == null || combined_merge_data == null) 563 return; 564 v=combined_merge_data.view; 565 d=combined_merge_data.digest; 566 if(v == null || d == null) { 567 if(log.isErrorEnabled()) log.error("view or digest is null, cannot send consolidated merge view/digest"); 568 return; 569 } 570 571 for(int i=0; i < coords.size(); i++) { 572 coord=(Address)coords.elementAt(i); 573 msg=new Message(coord, null, null); 574 hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW); 575 hdr.view=v; 576 hdr.my_digest=d; 577 hdr.merge_id=merge_id; 578 msg.putHeader(gms.getName(), hdr); 579 gms.passDown(new Event(Event.MSG, msg)); 580 } 581 } 582 583 586 void sendMergeResponse(Address sender, View view, Digest digest) { 587 Message msg=new Message(sender, null, null); 588 GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); 589 hdr.merge_id=merge_id; 590 hdr.view=view; 591 hdr.my_digest=digest; 592 msg.putHeader(gms.getName(), hdr); 593 if(log.isDebugEnabled()) log.debug("response=" + hdr); 594 gms.passDown(new Event(Event.MSG, msg)); 595 } 596 597 void sendMergeRejectedResponse(Address sender) { 598 Message msg=new Message(sender, null, null); 599 GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); 600 hdr.merge_rejected=true; 601 hdr.merge_id=merge_id; 602 msg.putHeader(gms.getName(), hdr); 603 if(log.isDebugEnabled()) log.debug("response=" + hdr); 604 gms.passDown(new Event(Event.MSG, msg)); 605 } 606 607 void sendMergeCancelledMessage(Vector coords, Serializable merge_id) { 608 Message msg; 609 GMS.GmsHeader hdr; 610 Address coord; 611 612 if(coords == null || merge_id == null) { 613 if(log.isErrorEnabled()) log.error("coords or merge_id == null"); 614 return; 615 } 616 for(int i=0; i < coords.size(); i++) { 617 coord=(Address)coords.elementAt(i); 618 msg=new Message(coord, null, null); 619 hdr=new GMS.GmsHeader(GMS.GmsHeader.CANCEL_MERGE); 620 hdr.merge_id=merge_id; 621 msg.putHeader(gms.getName(), hdr); 622 gms.passDown(new Event(Event.MSG, msg)); 623 } 624 } 625 626 627 void removeRejectedMergeRequests(Vector coords) { 628 MergeData data; 629 for(Iterator it=merge_rsps.iterator(); it.hasNext();) { 630 data=(MergeData)it.next(); 631 if(data.merge_rejected) { 632 if(data.getSender() != null && coords != null) 633 coords.removeElement(data.getSender()); 634 it.remove(); 635 if(log.isDebugEnabled()) log.debug("removed element " + data); 636 } 637 } 638 } 639 640 641 642 649 private class MergeTask implements Runnable { 650 Thread t=null; 651 Vector coords=null; 653 public void start(Vector coords) { 654 if(t == null || !t.isAlive()) { 655 this.coords=(Vector )(coords != null? coords.clone() : null); 656 t=new Thread (this, "MergeTask thread"); 657 t.setDaemon(true); 658 t.start(); 659 } 660 } 661 662 public void stop() { 663 Thread tmp=t; 664 if(isRunning()) { 665 t=null; 666 tmp.interrupt(); 667 } 668 t=null; 669 coords=null; 670 } 671 672 public boolean isRunning() { 673 return t != null && t.isAlive(); 674 } 675 676 679 public void run() { 680 MergeData combined_merge_data=null; 681 682 if(merging == true) { 683 if(log.isWarnEnabled()) log.warn("merge is already in progress, terminating"); 684 return; 685 } 686 687 if(log.isDebugEnabled()) log.debug("merge task started"); 688 try { 689 690 691 setMergeId(generateMergeId()); 692 693 694 getMergeDataFromSubgroupCoordinators(coords, gms.merge_timeout); 695 696 698 removeRejectedMergeRequests(coords); 699 700 if(merge_rsps.size() <= 1) { 701 if(log.isWarnEnabled()) 702 log.warn("merge responses from subgroup coordinators <= 1 (" + merge_rsps + "). Cancelling merge"); 703 sendMergeCancelledMessage(coords, merge_id); 704 return; 705 } 706 707 708 combined_merge_data=consolidateMergeData(merge_rsps); 709 if(combined_merge_data == null) { 710 if(log.isErrorEnabled()) log.error("combined_merge_data == null"); 711 sendMergeCancelledMessage(coords, merge_id); 712 return; 713 } 714 715 717 sendMergeView(coords, combined_merge_data); 718 } 719 catch(Throwable ex) { 720 if(log.isErrorEnabled()) log.error("exception=" + ex); 721 } 722 finally { 723 merging=false; 724 merge_leader=null; 725 if(log.isDebugEnabled()) log.debug("merge task terminated"); 726 t=null; 727 } 728 } 729 } 730 731 732 private class MergeCanceller implements TimeScheduler.Task { 733 private Object my_merge_id=null; 734 private long timeout; 735 private boolean cancelled=false; 736 737 public MergeCanceller(Object my_merge_id, long timeout) { 738 this.my_merge_id=my_merge_id; 739 this.timeout=timeout; 740 } 741 742 public boolean cancelled() { 743 return cancelled; 744 } 745 746 public void cancel() { 747 cancelled=true; 748 } 749 750 public long nextInterval() { 751 return timeout; 752 } 753 754 public void run() { 755 if(merge_id != null && my_merge_id.equals(merge_id)) { 756 if(log.isTraceEnabled()) 757 log.trace("cancelling merge due to timer timeout (" + timeout + " ms)"); 758 cancelMerge(); 759 cancelled=true; 760 } 761 else { 762 if(log.isTraceEnabled()) 763 log.trace("timer kicked in after " + timeout + " ms, but no (or different) merge was in progress: " + 764 "merge_id=" + merge_id + ", my_merge_id=" + my_merge_id); 765 } 766 } 767 } 768 769 } 770 | Popular Tags |