1 3 package org.jgroups.protocols.pbcast; 4 5 6 import org.jgroups.*; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.BoundedList; 9 import org.jgroups.util.TimeScheduler; 10 import org.jgroups.util.Util; 11 import org.jgroups.util.Streamable; 12 13 import java.io.*; 14 import java.util.Hashtable ; 15 import java.util.Iterator ; 16 import java.util.Properties ; 17 import java.util.Vector ; 18 19 20 21 22 27 public class GMS extends Protocol { 28 private GmsImpl impl=null; 29 Address local_addr=null; 30 final Membership members=new Membership(); private final Membership tmp_members=new Membership(); 33 34 private final Vector joining=new Vector (7); 35 36 37 private final Vector leaving=new Vector (7); 38 39 ViewId view_id=null; 40 private long ltime=0; 41 long join_timeout=5000; 42 long join_retry_timeout=2000; 43 long leave_timeout=5000; 44 private long digest_timeout=5000; long merge_timeout=10000; private final Object impl_mutex=new Object (); private final Object digest_mutex=new Object (); private Digest digest=null; private final Hashtable impls=new Hashtable (3); 50 private boolean shun=true; 51 boolean merge_leader=false; private boolean print_local_addr=true; 53 boolean disable_initial_coord=false; static final String CLIENT="Client"; 55 static final String COORD="Coordinator"; 56 static final String PART="Participant"; 57 TimeScheduler timer=null; 58 59 60 protected int num_prev_mbrs=50; 61 62 63 BoundedList prev_members=null; 64 65 static final String name="GMS"; 66 67 68 69 public GMS() { 70 initState(); 71 } 72 73 74 public String getName() { 75 return name; 76 } 77 78 79 public Vector requiredDownServices() { 80 Vector retval=new Vector (3); 81 retval.addElement(new Integer (Event.GET_DIGEST)); 82 retval.addElement(new Integer (Event.SET_DIGEST)); 83 retval.addElement(new Integer (Event.FIND_INITIAL_MBRS)); 84 return retval; 85 } 86 87 88 public void setImpl(GmsImpl new_impl) { 89 synchronized(impl_mutex) { 90 impl=new_impl; 91 if(log.isDebugEnabled()) { 92 String msg=(local_addr != null? local_addr.toString()+" " : "") + "changed role to " + new_impl.getClass().getName(); 93 log.debug(msg); 94 } 95 } 96 } 97 98 99 public GmsImpl getImpl() { 100 return impl; 101 } 102 103 104 public void init() throws Exception { 105 prev_members=new BoundedList(num_prev_mbrs); 106 timer=stack != null? stack.timer : null; 107 if(timer == null) 108 throw new Exception ("GMS.init(): timer is null"); 109 if(impl != null) 110 impl.init(); 111 } 112 113 public void start() throws Exception { 114 if(impl != null) impl.start(); 115 } 116 117 public void stop() { 118 if(impl != null) impl.stop(); 119 if(prev_members != null) 120 prev_members.removeAll(); 121 } 122 123 124 public void becomeCoordinator() { 125 CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD); 126 if(tmp == null) { 127 tmp=new CoordGmsImpl(this); 128 impls.put(COORD, tmp); 129 } 130 try { 131 tmp.init(); 132 } 133 catch(Exception e) { 134 log.error("exception switching to coordinator role", e); 135 } 136 setImpl(tmp); 137 } 138 139 140 public void becomeParticipant() { 141 ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART); 142 143 if(tmp == null) { 144 tmp=new ParticipantGmsImpl(this); 145 impls.put(PART, tmp); 146 } 147 try { 148 tmp.init(); 149 } 150 catch(Exception e) { 151 log.error("exception switching to participant", e); 152 } 153 setImpl(tmp); 154 } 155 156 public void becomeClient() { 157 ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT); 158 if(tmp == null) { 159 tmp=new ClientGmsImpl(this); 160 impls.put(CLIENT, tmp); 161 } 162 try { 163 tmp.init(); 164 } 165 catch(Exception e) { 166 log.error("exception switching to client role", e); 167 } 168 setImpl(tmp); 169 } 170 171 172 boolean haveCoordinatorRole() { 173 return impl != null && impl instanceof CoordGmsImpl; 174 } 175 176 177 181 public View getNextView(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) { 182 Vector mbrs; 183 long vid=0; 184 View v; 185 Membership tmp_mbrs=null; 186 Address tmp_mbr; 187 188 synchronized(members) { 189 if(view_id == null) { 190 log.error("view_id is null"); 191 return null; } 193 vid=Math.max(view_id.getId(), ltime) + 1; 194 ltime=vid; 195 if(log.isDebugEnabled()) log.debug("VID=" + vid + ", current members=" + 196 Util.printMembers(members.getMembers()) + 197 ", new_mbrs=" + Util.printMembers(new_mbrs) + 198 ", old_mbrs=" + Util.printMembers(old_mbrs) + ", suspected_mbrs=" + 199 Util.printMembers(suspected_mbrs)); 200 201 tmp_mbrs=tmp_members.copy(); tmp_mbrs.remove(suspected_mbrs); 203 tmp_mbrs.remove(old_mbrs); 204 tmp_mbrs.add(new_mbrs); 205 mbrs=tmp_mbrs.getMembers(); 206 v=new View(local_addr, vid, mbrs); 207 208 tmp_members.set(mbrs); 210 211 if(new_mbrs != null) { 213 for(int i=0; i < new_mbrs.size(); i++) { 214 tmp_mbr=(Address)new_mbrs.elementAt(i); 215 if(!joining.contains(tmp_mbr)) 216 joining.addElement(tmp_mbr); 217 } 218 } 219 220 if(old_mbrs != null) { 222 for(Iterator it=old_mbrs.iterator(); it.hasNext();) { 223 Address addr=(Address)it.next(); 224 if(!leaving.contains(addr)) 225 leaving.add(addr); 226 } 227 } 228 if(suspected_mbrs != null) { 229 for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) { 230 Address addr=(Address)it.next(); 231 if(!leaving.contains(addr)) 232 leaving.add(addr); 233 } 234 } 235 236 if(log.isDebugEnabled()) log.debug("new view is " + v); 237 return v; 238 } 239 } 240 241 242 272 public View castViewChange(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) { 273 View new_view; 274 275 new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs); 277 castViewChange(new_view); 278 return new_view; 279 } 280 281 282 public void castViewChange(View new_view) { 283 castViewChange(new_view, null); 284 } 285 286 287 public void castViewChange(View new_view, Digest digest) { 288 Message view_change_msg; 289 GmsHeader hdr; 290 291 if(log.isDebugEnabled()) log.debug("mcasting view {" + new_view + "} (" + new_view.size() + " mbrs)\n"); 292 view_change_msg=new Message(); hdr=new GmsHeader(GmsHeader.VIEW, new_view); 294 hdr.my_digest=digest; 295 view_change_msg.putHeader(name, hdr); 296 passDown(new Event(Event.MSG, view_change_msg)); 297 } 298 299 300 304 public void installView(View new_view, Digest digest) { 305 if(digest != null) 306 mergeDigest(digest); 307 installView(new_view); 308 } 309 310 311 314 public void installView(View new_view) { 315 Address coord; 316 int rc; 317 ViewId vid=new_view.getVid(); 318 Vector mbrs=new_view.getMembers(); 319 320 if(log.isDebugEnabled()) log.debug("[local_addr=" + local_addr + "] view is " + new_view); 321 322 if(view_id != null) { 324 rc=vid.compareTo(view_id); 325 if(rc <= 0) { 326 if(log.isDebugEnabled()) 327 log.debug("[" + local_addr + "] received view <= current view;" + 328 " discarding it (current vid: " + view_id + ", new vid: " + vid + ')'); 329 return; 330 } 331 } 332 333 ltime=Math.max(vid.getId(), ltime); 335 337 if(checkSelfInclusion(mbrs) == false) { 338 if(log.isWarnEnabled()) log.warn("checkSelfInclusion() failed, " + local_addr + 339 " is not a member of view " + new_view + "; discarding view"); 340 341 if(shun && local_addr != null && prev_members.contains(local_addr)) { 346 if(log.isWarnEnabled()) 347 log.warn("I (" + local_addr + ") am being shunned, will leave and " + 348 "rejoin group (prev_members are " + prev_members + ')'); 349 if(impl != null) 350 impl.handleExit(); 351 passUp(new Event(Event.EXIT)); 352 } 353 return; 354 } 355 356 synchronized(members) { view_id=vid.copy(); 359 360 if(mbrs != null && mbrs.size() > 0) { 362 members.set(mbrs); 363 tmp_members.set(members); 364 joining.removeAll(mbrs); leaving.retainAll(mbrs); 367 368 tmp_members.add(joining); tmp_members.remove(leaving); 371 for(Iterator it=mbrs.iterator(); it.hasNext();) { 373 Address addr=(Address)it.next(); 374 if(!prev_members.contains(addr)) 375 prev_members.add(addr); 376 } 377 } 378 379 Event view_event=new Event(Event.VIEW_CHANGE, new_view.clone()); 381 passDown(view_event); passUp(view_event); 383 384 coord=determineCoordinator(); 385 if(coord != null && coord.equals(local_addr) && !haveCoordinatorRole()) { 388 becomeCoordinator(); 389 } 390 else { 391 if(haveCoordinatorRole() && !local_addr.equals(coord)) 392 becomeParticipant(); 393 } 394 } 395 } 396 397 398 protected Address determineCoordinator() { 399 synchronized(members) { 400 return members != null && members.size() > 0? (Address)members.elementAt(0) : null; 401 } 402 } 403 404 405 406 protected boolean wouldBeNewCoordinator(Address potential_new_coord) { 407 Address new_coord=null; 408 409 if(potential_new_coord == null) return false; 410 411 synchronized(members) { 412 if(members.size() < 2) return false; 413 new_coord=(Address)members.elementAt(1); if(new_coord != null && new_coord.equals(potential_new_coord)) 415 return true; 416 return false; 417 } 418 } 419 420 421 422 protected boolean checkSelfInclusion(Vector mbrs) { 423 Object mbr; 424 if(mbrs == null) 425 return false; 426 for(int i=0; i < mbrs.size(); i++) { 427 mbr=mbrs.elementAt(i); 428 if(mbr != null && local_addr.equals(mbr)) 429 return true; 430 } 431 return false; 432 } 433 434 435 public View makeView(Vector mbrs) { 436 Address coord=null; 437 long id=0; 438 439 if(view_id != null) { 440 coord=view_id.getCoordAddress(); 441 id=view_id.getId(); 442 } 443 return new View(coord, id, mbrs); 444 } 445 446 447 public View makeView(Vector mbrs, ViewId vid) { 448 Address coord=null; 449 long id=0; 450 451 if(vid != null) { 452 coord=vid.getCoordAddress(); 453 id=vid.getId(); 454 } 455 return new View(coord, id, mbrs); 456 } 457 458 459 460 public void setDigest(Digest d) { 461 passDown(new Event(Event.SET_DIGEST, d)); 462 } 463 464 465 466 public void mergeDigest(Digest d) { 467 passDown(new Event(Event.MERGE_DIGEST, d)); 468 } 469 470 471 473 public Digest getDigest() { 474 Digest ret=null; 475 476 synchronized(digest_mutex) { 477 digest=null; 478 passDown(Event.GET_DIGEST_EVT); 479 if(digest == null) { 480 try { 481 digest_mutex.wait(digest_timeout); 482 } 483 catch(Exception ex) { 484 } 485 } 486 if(digest != null) { 487 ret=digest; 488 digest=null; 489 return ret; 490 } 491 else { 492 if(log.isErrorEnabled()) log.error("digest could not be fetched from below"); 493 return null; 494 } 495 } 496 } 497 498 499 public void up(Event evt) { 500 Object obj; 501 Message msg; 502 GmsHeader hdr; 503 MergeData merge_data; 504 505 switch(evt.getType()) { 506 507 case Event.MSG: 508 msg=(Message)evt.getArg(); 509 obj=msg.getHeader(name); 510 if(obj == null || !(obj instanceof GmsHeader)) 511 break; 512 hdr=(GmsHeader)msg.removeHeader(name); 513 switch(hdr.type) { 514 case GmsHeader.JOIN_REQ: 515 handleJoinRequest(hdr.mbr); 516 break; 517 case GmsHeader.JOIN_RSP: 518 impl.handleJoinResponse(hdr.join_rsp); 519 break; 520 case GmsHeader.LEAVE_REQ: 521 if(log.isDebugEnabled()) log.debug("received LEAVE_REQ " + hdr + " from " + msg.getSrc()); 522 if(hdr.mbr == null) { 523 if(log.isErrorEnabled()) log.error("LEAVE_REQ's mbr field is null"); 524 return; 525 } 526 impl.handleLeave(hdr.mbr, false); 528 break; 529 case GmsHeader.LEAVE_RSP: 530 impl.handleLeaveResponse(); 531 break; 532 case GmsHeader.VIEW: 533 if(hdr.view == null) { 534 if(log.isErrorEnabled()) log.error("[VIEW]: view == null"); 535 return; 536 } 537 impl.handleViewChange(hdr.view, hdr.my_digest); 538 break; 539 540 case GmsHeader.MERGE_REQ: 541 impl.handleMergeRequest(msg.getSrc(), hdr.merge_id); 542 break; 543 544 case GmsHeader.MERGE_RSP: 545 merge_data=new MergeData(msg.getSrc(), hdr.view, hdr.my_digest); 546 merge_data.merge_rejected=hdr.merge_rejected; 547 impl.handleMergeResponse(merge_data, hdr.merge_id); 548 break; 549 550 case GmsHeader.INSTALL_MERGE_VIEW: 551 impl.handleMergeView(new MergeData(msg.getSrc(), hdr.view, hdr.my_digest), hdr.merge_id); 552 break; 553 554 case GmsHeader.CANCEL_MERGE: 555 impl.handleMergeCancelled(hdr.merge_id); 556 break; 557 558 default: 559 if(log.isErrorEnabled()) log.error("GmsHeader with type=" + hdr.type + " not known"); 560 } 561 return; 563 case Event.CONNECT_OK: case Event.DISCONNECT_OK: return; 566 567 568 case Event.SET_LOCAL_ADDRESS: 569 local_addr=(Address)evt.getArg(); 570 if(print_local_addr) { 571 System.out.println("\n-------------------------------------------------------\n" + 572 "GMS: address is " + local_addr + 573 "\n-------------------------------------------------------"); 574 } 575 break; 577 case Event.SUSPECT: 578 impl.suspect((Address)evt.getArg()); 579 break; 581 case Event.UNSUSPECT: 582 impl.unsuspect((Address)evt.getArg()); 583 return; 585 case Event.MERGE: 586 impl.merge((Vector )evt.getArg()); 587 return; } 589 590 if(impl.handleUpEvent(evt)) 591 passUp(evt); 592 } 593 594 595 605 public void receiveUpEvent(Event evt) { 606 if(evt.getType() == Event.GET_DIGEST_OK) { 607 synchronized(digest_mutex) { 608 digest=(Digest)evt.getArg(); 609 digest_mutex.notifyAll(); 610 } 611 return; 612 } 613 super.receiveUpEvent(evt); 614 } 615 616 617 public void down(Event evt) { 618 switch(evt.getType()) { 619 620 case Event.CONNECT: 621 passDown(evt); 622 if(local_addr == null) 623 if(log.isFatalEnabled()) log.fatal("[CONNECT] local_addr is null"); 624 impl.join(local_addr); 625 passUp(new Event(Event.CONNECT_OK)); 626 return; 628 case Event.DISCONNECT: 629 impl.leave((Address)evt.getArg()); 630 passUp(new Event(Event.DISCONNECT_OK)); 631 initState(); break; } 634 635 if(impl.handleDownEvent(evt)) 636 passDown(evt); 637 } 638 639 640 641 public boolean setProperties(Properties props) { 642 String str; 643 644 super.setProperties(props); 645 str=props.getProperty("shun"); 646 if(str != null) { 647 shun=Boolean.valueOf(str).booleanValue(); 648 props.remove("shun"); 649 } 650 651 str=props.getProperty("merge_leader"); 652 if(str != null) { 653 merge_leader=Boolean.valueOf(str).booleanValue(); 654 props.remove("merge_leader"); 655 } 656 657 str=props.getProperty("print_local_addr"); 658 if(str != null) { 659 print_local_addr=Boolean.valueOf(str).booleanValue(); 660 props.remove("print_local_addr"); 661 } 662 663 str=props.getProperty("join_timeout"); if(str != null) { 665 join_timeout=Long.parseLong(str); 666 props.remove("join_timeout"); 667 } 668 669 str=props.getProperty("join_retry_timeout"); if(str != null) { 671 join_retry_timeout=Long.parseLong(str); 672 props.remove("join_retry_timeout"); 673 } 674 675 str=props.getProperty("leave_timeout"); if(str != null) { 677 leave_timeout=Long.parseLong(str); 678 props.remove("leave_timeout"); 679 } 680 681 str=props.getProperty("merge_timeout"); if(str != null) { 683 merge_timeout=Long.parseLong(str); 684 props.remove("merge_timeout"); 685 } 686 687 str=props.getProperty("digest_timeout"); if(str != null) { 689 digest_timeout=Long.parseLong(str); 690 props.remove("digest_timeout"); 691 } 692 693 str=props.getProperty("disable_initial_coord"); 694 if(str != null) { 695 disable_initial_coord=Boolean.valueOf(str).booleanValue(); 696 props.remove("disable_initial_coord"); 697 } 698 699 str=props.getProperty("num_prev_mbrs"); 700 if(str != null) { 701 num_prev_mbrs=Integer.parseInt(str); 702 props.remove("num_prev_mbrs"); 703 } 704 705 if(props.size() > 0) { 706 System.err.println("GMS.setProperties(): the following properties are not recognized:"); 707 props.list(System.out); 708 return false; 709 } 710 return true; 711 } 712 713 714 715 716 717 void initState() { 718 becomeClient(); 719 view_id=null; 720 } 721 722 723 void handleJoinRequest(Address mbr) { 724 JoinRsp join_rsp; 725 Message m; 726 GmsHeader hdr; 727 728 if(mbr == null) { 729 if(log.isErrorEnabled()) log.error("mbr is null"); 730 return; 731 } 732 733 if(log.isDebugEnabled()) log.debug("mbr=" + mbr); 734 735 join_rsp=impl.handleJoin(mbr); 737 if(join_rsp == null) 738 if(log.isErrorEnabled()) 739 log.error(impl.getClass().toString() + ".handleJoin(" + mbr + 740 ") returned null: will not be able to multicast new view"); 741 742 if(join_rsp != null && join_rsp.getView() != null) 746 passDown(new Event(Event.TMP_VIEW, join_rsp.getView())); 747 748 m=new Message(mbr, null, null); 750 hdr=new GmsHeader(GmsHeader.JOIN_RSP, join_rsp); 751 m.putHeader(name, hdr); 752 passDown(new Event(Event.MSG, m)); 753 754 if(join_rsp != null) 756 castViewChange(join_rsp.getView()); 757 } 758 759 760 761 762 763 764 765 766 public static class GmsHeader extends Header implements Streamable { 767 public static final int JOIN_REQ=1; 768 public static final int JOIN_RSP=2; 769 public static final int LEAVE_REQ=3; 770 public static final int LEAVE_RSP=4; 771 public static final int VIEW=5; 772 public static final int MERGE_REQ=6; 773 public static final int MERGE_RSP=7; 774 public static final int INSTALL_MERGE_VIEW=8; 775 public static final int CANCEL_MERGE=9; 776 777 int type=0; 778 View view=null; Address mbr=null; JoinRsp join_rsp=null; Digest my_digest=null; Serializable merge_id=null; boolean merge_rejected=false; 785 786 public GmsHeader() { 787 } 789 public GmsHeader(int type) { 790 this.type=type; 791 } 792 793 794 795 public GmsHeader(int type, View view) { 796 this.type=type; 797 this.view=view; 798 } 799 800 801 802 public GmsHeader(int type, Address mbr) { 803 this.type=type; 804 this.mbr=mbr; 805 } 806 807 808 public GmsHeader(int type, JoinRsp join_rsp) { 809 this.type=type; 810 this.join_rsp=join_rsp; 811 } 812 813 814 public String toString() { 815 StringBuffer sb=new StringBuffer ("GmsHeader"); 816 sb.append('[' + type2String(type) + ']'); 817 switch(type) { 818 819 case JOIN_REQ: 820 sb.append(": mbr=" + mbr); 821 break; 822 823 case JOIN_RSP: 824 sb.append(": join_rsp=" + join_rsp); 825 break; 826 827 case LEAVE_REQ: 828 sb.append(": mbr=" + mbr); 829 break; 830 831 case LEAVE_RSP: 832 break; 833 834 case VIEW: 835 sb.append(": view=" + view); 836 break; 837 838 case MERGE_REQ: 839 sb.append(": merge_id=" + merge_id); 840 break; 841 842 case MERGE_RSP: 843 sb.append(": view=" + view + ", digest=" + my_digest + ", merge_rejected=" + merge_rejected + 844 ", merge_id=" + merge_id); 845 break; 846 847 case INSTALL_MERGE_VIEW: 848 sb.append(": view=" + view + ", digest=" + my_digest); 849 break; 850 851 case CANCEL_MERGE: 852 sb.append(", <merge cancelled>, merge_id=" + merge_id); 853 break; 854 } 855 return sb.toString(); 856 } 857 858 859 public static String type2String(int type) { 860 switch(type) { 861 case JOIN_REQ: 862 return "JOIN_REQ"; 863 case JOIN_RSP: 864 return "JOIN_RSP"; 865 case LEAVE_REQ: 866 return "LEAVE_REQ"; 867 case LEAVE_RSP: 868 return "LEAVE_RSP"; 869 case VIEW: 870 return "VIEW"; 871 case MERGE_REQ: 872 return "MERGE_REQ"; 873 case MERGE_RSP: 874 return "MERGE_RSP"; 875 case INSTALL_MERGE_VIEW: 876 return "INSTALL_MERGE_VIEW"; 877 case CANCEL_MERGE: 878 return "CANCEL_MERGE"; 879 default: 880 return "<unknown>"; 881 } 882 } 883 884 885 public void writeExternal(ObjectOutput out) throws IOException { 886 out.writeInt(type); 887 out.writeObject(view); 888 out.writeObject(mbr); 889 out.writeObject(join_rsp); 890 out.writeObject(my_digest); 891 out.writeObject(merge_id); 892 out.writeBoolean(merge_rejected); 893 } 894 895 896 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 897 type=in.readInt(); 898 view=(View)in.readObject(); 899 mbr=(Address)in.readObject(); 900 join_rsp=(JoinRsp)in.readObject(); 901 my_digest=(Digest)in.readObject(); 902 merge_id=(Serializable)in.readObject(); 903 merge_rejected=in.readBoolean(); 904 } 905 906 907 public void writeTo(DataOutputStream out) throws IOException { 908 out.writeInt(type); 909 Util.writeStreamable(view, out); 910 Util.writeAddress(mbr, out); 911 Util.writeStreamable(join_rsp, out); 912 Util.writeStreamable(my_digest, out); 913 Util.writeStreamable((Streamable)merge_id, out); out.writeBoolean(merge_rejected); 915 } 916 917 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 918 type=in.readInt(); 919 view=(View)Util.readStreamable(View.class, in); 920 mbr=Util.readAddress(in); 921 join_rsp=(JoinRsp)Util.readStreamable(JoinRsp.class, in); 922 my_digest=(Digest)Util.readStreamable(Digest.class, in); 923 merge_id=(Serializable)Util.readStreamable(ViewId.class, in); 924 merge_rejected=in.readBoolean(); 925 } 926 927 } 928 929 930 } 931 | Popular Tags |