1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.blocks.GroupRequest; 8 import org.jgroups.blocks.MethodCall; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.stack.RpcProtocol; 11 import org.jgroups.util.Queue; 12 import org.jgroups.util.QueueClosedException; 13 14 import java.util.Hashtable ; 15 import java.util.Properties ; 16 import java.util.Vector ; 17 18 19 26 public class GMS extends RpcProtocol implements Runnable { 27 private GmsImpl impl=null; 28 public Address local_addr=null; 29 public String group_addr=null; 30 public final Membership members=new Membership(); 31 public ViewId view_id=null; 32 public long ltime=0; 33 public long join_timeout=5000; 34 public long join_retry_timeout=2000; 35 private long flush_timeout=0; private long rebroadcast_timeout=0; private long view_change_timeout=10000; public long leave_timeout=5000; 39 public final Object impl_mutex=new Object (); public final Object view_mutex=new Object (); private Queue event_queue=new Queue(); private Thread evt_thread=null; 43 private final Object flush_mutex=new Object (); 44 private FlushRsp flush_rsp=null; 45 private final Object rebroadcast_mutex=new Object (); 46 private boolean rebroadcast_unstable_msgs=true; 47 private boolean print_local_addr=true; 48 boolean disable_initial_coord=false; private final Hashtable impls=new Hashtable (); 50 static final String CLIENT="Client"; 51 static final String COORD="Coordinator"; 52 static final String PART="Participant"; 53 54 55 public GMS() { 56 initState(); 57 } 58 59 60 public String getName() { 61 return "GMS"; 62 } 63 64 public Vector requiredDownServices() { 65 Vector retval=new Vector (); 66 retval.addElement(new Integer (Event.FLUSH)); 67 retval.addElement(new Integer (Event.FIND_INITIAL_MBRS)); 68 return retval; 69 } 70 71 72 public void setImpl(GmsImpl new_impl) { 73 synchronized(impl_mutex) { 74 impl=new_impl; 75 if(log.isInfoEnabled()) log.info("changed role to " + new_impl.getClass().getName()); 76 } 77 } 78 79 80 public void start() throws Exception { 81 super.start(); 82 if(checkForViewEnforcer(up_prot) == false) { 83 if(log.isWarnEnabled()) log.warn("I need protocol layer " + 84 "VIEW_ENFORCER above me to discard messages sent to me while I'm " + 85 "not yet a group member ! Otherwise, these messages will be delivered " + 86 "to the application without checking...\n"); 87 } 88 89 if(_corr != null) 90 _corr.setDeadlockDetection(true); 91 else 92 throw new Exception ("GMS.start(): cannot set deadlock detection in corr, as it is null !"); 93 } 94 95 96 public void becomeCoordinator() { 97 CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD); 98 99 if(tmp == null) { 100 tmp=new CoordGmsImpl(this); 101 tmp.leaving=false; 102 tmp.received_last_view=false; impls.put(COORD, tmp); 104 } 105 106 setImpl(tmp); 107 } 108 109 110 public void becomeParticipant() { 111 ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART); 112 113 if(tmp == null) { 114 tmp=new ParticipantGmsImpl(this); 115 tmp.leaving=false; 116 tmp.received_final_view=false; 117 impls.put(PART, tmp); 118 } 119 setImpl(tmp); 120 } 121 122 public void becomeClient() { 123 ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT); 124 125 if(tmp == null) { 126 tmp=new ClientGmsImpl(this); 127 impls.put(CLIENT, tmp); 128 } 129 else 130 tmp.init(); 131 132 setImpl(tmp); 133 } 134 135 boolean haveCoordinatorRole() { 136 return impl != null && impl instanceof CoordGmsImpl; 137 } 138 139 140 144 public View getNextView(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) { 145 Vector mbrs; 146 long vid=0; 147 View v; 148 Membership tmp_mbrs; 149 Vector mbrs_to_remove=new Vector (); 150 151 if(old_mbrs != null && old_mbrs.size() > 0) 152 for(int i=0; i < old_mbrs.size(); i++) 153 mbrs_to_remove.addElement(old_mbrs.elementAt(i)); 154 if(suspected_mbrs != null && suspected_mbrs.size() > 0) 155 for(int i=0; i < suspected_mbrs.size(); i++) 156 if(!mbrs_to_remove.contains(suspected_mbrs.elementAt(i))) 157 mbrs_to_remove.addElement(suspected_mbrs.elementAt(i)); 158 159 synchronized(view_mutex) { 160 vid=Math.max(view_id.getId(), ltime) + 1; 161 ltime=vid; 162 tmp_mbrs=members.copy(); 163 tmp_mbrs.merge(new_mbrs, mbrs_to_remove); 164 mbrs=(Vector )tmp_mbrs.getMembers().clone(); 165 v=new View(local_addr, vid, mbrs); 166 return v; 167 } 168 } 169 170 171 176 Vector computeFlushDestination(Vector suspected_mbrs) { 177 Vector ret=members.getMembers(); if(suspected_mbrs != null && suspected_mbrs.size() > 0) 179 for(int i=0; i < suspected_mbrs.size(); i++) 180 ret.removeElement(suspected_mbrs.elementAt(i)); 181 return ret; 182 } 183 184 185 190 Vector computeViewDestination(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) { 191 Vector ret=members.getMembers(); Address mbr; 193 194 if(new_mbrs != null) { 196 for(int i=0; i < new_mbrs.size(); i++) { 197 mbr=(Address)new_mbrs.elementAt(i); 198 if(!ret.contains(mbr)) 199 ret.addElement(new_mbrs.elementAt(i)); 200 } 201 } 202 203 205 206 if(suspected_mbrs != null) { 208 for(int i=0; i < suspected_mbrs.size(); i++) { 209 mbr=(Address)suspected_mbrs.elementAt(i); 210 ret.removeElement(suspected_mbrs.elementAt(i)); 211 } 212 } 213 return ret; 214 } 215 216 222 223 public void flush(Vector flush_dest, Vector suspected_mbrs) { 224 Vector rebroadcast_msgs=new Vector (); 225 226 if(suspected_mbrs == null) 227 suspected_mbrs=new Vector (); 228 229 while(flush_dest.size() > 0) { 230 flush_rsp=null; 231 synchronized(flush_mutex) { 232 passDown(new Event(Event.FLUSH, flush_dest)); if(flush_rsp == null) { 234 try { 235 flush_mutex.wait(flush_timeout); 236 } 237 catch(Exception e) { 238 } 239 } 240 } 241 if(flush_rsp == null) { 242 break; 243 } 244 245 if(rebroadcast_unstable_msgs && flush_rsp.unstable_msgs != null && 246 flush_rsp.unstable_msgs.size() > 0) { 247 Message m; 248 for(int i=0; i < flush_rsp.unstable_msgs.size(); i++) { 249 m=(Message)flush_rsp.unstable_msgs.elementAt(i); 250 251 rebroadcast_msgs.addElement(m); 254 } 255 } 256 257 if(flush_rsp.result == true) 258 break; 259 else { 260 if(flush_rsp.failed_mbrs != null) { 261 for(int i=0; i < flush_rsp.failed_mbrs.size(); i++) { 262 flush_dest.removeElement(flush_rsp.failed_mbrs.elementAt(i)); 263 suspected_mbrs.addElement(flush_rsp.failed_mbrs.elementAt(i)); 264 } 265 } 266 } 267 } if(log.isInfoEnabled()) log.info("flushing completed."); 269 270 271 if(rebroadcast_unstable_msgs && rebroadcast_msgs.size() > 0) { 273 274 if(log.isInfoEnabled()) log.info("re-broadcasting unstable messages (" + 275 rebroadcast_msgs.size() + ')'); 276 synchronized(rebroadcast_mutex) { 278 passDown(new Event(Event.REBROADCAST_MSGS, rebroadcast_msgs)); 279 try { 280 rebroadcast_mutex.wait(rebroadcast_timeout); 281 } 282 catch(Exception e) { 283 } 284 } 285 if(log.isInfoEnabled()) log.info("re-broadcasting messages completed"); 286 } 287 } 288 289 321 public void castViewChange(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) { 322 View new_view, tmp_view; 323 ViewId new_vid; 324 Vector flush_dest=computeFlushDestination(suspected_mbrs); Vector view_dest=computeViewDestination(new_mbrs, old_mbrs, suspected_mbrs); 327 new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs); 329 new_vid=new_view.getVid(); 330 331 if(log.isInfoEnabled()) log.info("FLUSH phase, flush_dest: " + flush_dest + 332 "\n\tview_dest: " + view_dest + "\n\tnew_view: " + new_view + '\n'); 333 flush(flush_dest, suspected_mbrs); 334 if(log.isInfoEnabled()) 335 log.info("FLUSH phase done"); 336 337 341 view_dest=computeViewDestination(new_mbrs, old_mbrs, suspected_mbrs); 342 tmp_view=new View(null, view_dest); 343 344 Event view_event=new Event(Event.TMP_VIEW, tmp_view); passDown(view_event); 347 if(log.isInfoEnabled()) log.info("mcasting view {" + new_vid + ", " + view_dest + '}'); 348 passDown(new Event(Event.SWITCH_NAK_ACK)); Object [] args=new Object []{new_vid, new_view.getMembers() }; 350 MethodCall call=new MethodCall("handleViewChange", args, new String []{ViewId.class.getName(), Vector .class.getName()}); 351 callRemoteMethods(view_dest, call, 353 GroupRequest.GET_ALL, view_change_timeout); 354 if(log.isInfoEnabled()) log.info("mcasting view completed"); 355 passDown(new Event(Event.SWITCH_NAK)); } 357 358 359 363 public void installView(ViewId new_view, Vector mbrs) { 364 Object coord; 365 int rc; 366 367 synchronized(view_mutex) { ltime=Math.max(new_view.getId(), ltime); if(log.isInfoEnabled()) log.info("received view change, vid=" + new_view); 370 371 373 if(checkSelfInclusion(mbrs) == false) { 374 if(log.isWarnEnabled()) log.warn("I'm not member of " + mbrs + ", discarding"); 375 return; 376 } 377 378 379 if(view_id == null) { 380 if(new_view == null) { 381 if(log.isErrorEnabled()) log.error("view_id and new_view are null !"); 382 return; 383 } 384 else { view_id=(ViewId)new_view.clone(); 386 } 387 } 388 else { 389 if(new_view == null) { if(log.isErrorEnabled()) log.error("new_view is null !"); 391 return; 392 } 393 else { rc=new_view.compareTo(view_id); if(rc <= 0) { { 397 if(log.isWarnEnabled()) log.warn("received view <= current view; discarding it ! " + 398 "(view_id: " + view_id + ", new_view: " + new_view + ')'); 399 } 400 return; 401 } 402 else { 404 if(new_view.getCoordAddress() != null) { 405 view_id=new ViewId(new_view.getCoordAddress(), new_view.getId()); 406 } 407 else { 408 view_id=new ViewId(view_id.getCoordAddress(), new_view.getId()); 409 } 410 } 411 } 412 } 413 414 if(mbrs != null && mbrs.size() > 0) 415 members.set(mbrs); 416 417 418 419 Event view_event=new Event(Event.VIEW_CHANGE, makeView(members.getMembers())); 421 passDown(view_event); passUp(view_event); 423 424 coord=determineCoordinator(); 425 if(coord != null && coord.equals(local_addr)) { 426 if (! haveCoordinatorRole()) becomeCoordinator(); 428 } 429 else { 430 if(haveCoordinatorRole() && !local_addr.equals(coord)) 431 becomeParticipant(); 432 } 433 } 434 } 435 436 437 protected Address determineCoordinator() { 438 synchronized(members) { 439 return members != null && members.size() > 0? (Address)members.elementAt(0) : null; 440 } 441 } 442 443 444 447 protected boolean checkSelfInclusion(Vector mbrs) { 448 Object mbr; 449 if(mbrs == null) 450 return false; 451 for(int i=0; i < mbrs.size(); i++) { 452 mbr=mbrs.elementAt(i); 453 if(mbr != null && local_addr.equals(mbr)) 454 return true; 455 } 456 return false; 457 } 458 459 460 public View makeView(Vector mbrs) { 461 Address coord=null; 462 long id=0; 463 464 if(view_id != null) { 465 coord=view_id.getCoordAddress(); 466 id=view_id.getId(); 467 } 468 return new View(coord, id, mbrs); 469 } 470 471 472 public View makeView(Vector mbrs, ViewId vid) { 473 Address coord=null; 474 long id=0; 475 476 if(vid != null) { 477 coord=vid.getCoordAddress(); 478 id=vid.getId(); 479 } 480 return new View(coord, id, mbrs); 481 } 482 483 484 485 486 487 488 public void join(Address mbr) { 489 synchronized(impl_mutex) { 490 impl.join(mbr); 491 } 492 } 493 494 public void leave(Address mbr) { 495 synchronized(impl_mutex) { 496 impl.leave(mbr); 497 } 498 } 499 500 public void suspect(Address mbr) { 501 synchronized(impl_mutex) { 502 impl.suspect(mbr); 503 } 504 } 505 506 public void merge(Vector other_coords) { 507 synchronized(impl_mutex) { 508 impl.merge(other_coords); 509 } 510 } 511 512 public boolean handleJoin(Address mbr) { 513 synchronized(impl_mutex) { 514 return impl.handleJoin(mbr); 515 } 516 } 517 518 public void handleLeave(Address mbr, boolean suspected) { 519 synchronized(impl_mutex) { 520 impl.handleLeave(mbr, suspected); 521 } 522 } 523 524 public void handleViewChange(ViewId new_view, Vector mbrs) { 525 impl.handleViewChange(new_view, mbrs); 527 } 529 530 public View handleMerge(ViewId other_vid, Vector other_members) { 531 synchronized(impl_mutex) { 532 if(log.isTraceEnabled()) 533 { 534 View v=impl.handleMerge(other_vid, other_members); 535 if(log.isInfoEnabled()) log.info("returning view: " + v); 536 return v; 537 } 538 return impl.handleMerge(other_vid, other_members); 539 } 540 } 541 542 public void handleSuspect(Address mbr) { 543 synchronized(impl_mutex) { 544 impl.handleSuspect(mbr); 545 } 546 } 547 548 549 550 551 552 boolean checkForViewEnforcer(Protocol up_protocol) { 553 String prot_name; 554 555 if(up_protocol == null) 556 return false; 557 prot_name=up_protocol.getName(); 558 if(prot_name != null && "VIEW_ENFORCER".equals(prot_name)) 559 return true; 560 return checkForViewEnforcer(up_protocol.getUpProtocol()); 561 } 562 563 564 571 public boolean handleUpEvent(Event evt) { 572 switch(evt.getType()) { 573 574 case Event.CONNECT_OK: case Event.DISCONNECT_OK: return false; 577 578 579 case Event.SET_LOCAL_ADDRESS: 580 local_addr=(Address)evt.getArg(); 581 582 if(print_local_addr) { 583 System.out.println("\n-------------------------------------------------------\n" + 584 "GMS: address is " + local_addr + 585 "\n-------------------------------------------------------"); 586 } 587 return true; 589 case Event.SUSPECT: 590 try { 591 event_queue.add(evt); 592 } 593 catch(Exception e) { 594 } 595 return true; 597 case Event.MERGE: 598 try { 599 event_queue.add(evt); 600 } 601 catch(Exception e) { 602 } 603 return false; 605 606 case Event.FLUSH_OK: 607 synchronized(flush_mutex) { 608 flush_rsp=(FlushRsp)evt.getArg(); 609 flush_mutex.notifyAll(); 610 } 611 return false; 613 case Event.REBROADCAST_MSGS_OK: 614 synchronized(rebroadcast_mutex) { 615 rebroadcast_mutex.notifyAll(); 616 } 617 return false; } 619 620 return impl.handleUpEvent(evt); 621 } 622 623 624 631 public boolean handleDownEvent(Event evt) { 632 switch(evt.getType()) { 633 634 case Event.CONNECT: 635 passDown(evt); 636 try { 637 group_addr=(String )evt.getArg(); 638 } 639 catch(ClassCastException cce) { 640 if(log.isErrorEnabled()) log.error("group address must " + 641 "be a string (group name) to make sense"); 642 } 643 impl.join(local_addr); 644 passUp(new Event(Event.CONNECT_OK)); 645 startEventHandlerThread(); 646 return false; 648 case Event.DISCONNECT: 649 impl.leave((Address)evt.getArg()); 650 passUp(new Event(Event.DISCONNECT_OK)); 651 stopEventHandlerThread(); 652 initState(); 653 return true; } 655 656 return impl.handleDownEvent(evt); 657 } 658 659 660 public void receiveDownEvent(Event evt) { 663 if(evt.getType() == Event.BLOCK_OK) { 664 passDown(evt); 665 return; 666 } 667 super.receiveDownEvent(evt); 668 } 669 670 671 674 public boolean setProperties(Properties props) { 675 String str; 676 677 super.setProperties(props); 678 str=props.getProperty("join_timeout"); if(str != null) { 680 join_timeout=Long.parseLong(str); 681 props.remove("join_timeout"); 682 } 683 684 str=props.getProperty("print_local_addr"); 685 if(str != null) { 686 print_local_addr=Boolean.valueOf(str).booleanValue(); 687 props.remove("print_local_addr"); 688 } 689 690 str=props.getProperty("view_change_timeout"); if(str != null) { 692 view_change_timeout=Long.parseLong(str); 693 props.remove("view_change_timeout"); 694 } 695 696 str=props.getProperty("join_retry_timeout"); if(str != null) { 698 join_retry_timeout=Long.parseLong(str); 699 props.remove("join_retry_timeout"); 700 } 701 702 str=props.getProperty("leave_timeout"); if(str != null) { 704 leave_timeout=Long.parseLong(str); 705 props.remove("leave_timeout"); 706 } 707 708 str=props.getProperty("flush_timeout"); if(str != null) { 710 flush_timeout=Long.parseLong(str); 711 props.remove("flush_timeout"); 712 } 713 714 str=props.getProperty("rebroadcast_unstable_msgs"); if(str != null) { 716 rebroadcast_unstable_msgs=Boolean.valueOf(str).booleanValue(); 717 props.remove("rebroadcast_unstable_msgs"); 718 } 719 720 str=props.getProperty("rebroadcast_timeout"); if(str != null) { 722 rebroadcast_timeout=Long.parseLong(str); 723 props.remove("rebroadcast_timeout"); 724 } 725 726 str=props.getProperty("disable_initial_coord"); if(str != null) { 728 disable_initial_coord=Boolean.valueOf(str).booleanValue(); 729 props.remove("disable_initial_coord"); 730 } 731 732 if(props.size() > 0) { 733 System.err.println("GMS.setProperties(): the following properties are not recognized:"); 734 props.list(System.out); 735 return false; 736 } 737 return true; 738 } 739 740 741 public void run() { 742 Event evt; 743 744 while(evt_thread != null && event_queue != null) { 745 try { 746 evt=(Event)event_queue.remove(); 747 switch(evt.getType()) { 748 case Event.SUSPECT: 749 impl.suspect((Address)evt.getArg()); 750 break; 751 case Event.MERGE: 752 impl.merge((Vector )evt.getArg()); 753 break; 754 default: 755 if(log.isErrorEnabled()) log.error("event handler thread encountered event of type " + 756 Event.type2String(evt.getType()) + ": not handled by me !"); 757 break; 758 } 759 } 760 catch(QueueClosedException closed) { 761 break; 762 } 763 catch(Exception ex) { 764 if(log.isWarnEnabled()) log.warn("exception=" + ex); 765 } 766 } 767 } 768 769 770 771 772 773 774 void initState() { 775 becomeClient(); 776 impl.init(); 777 view_id=null; 778 if(members != null) 779 members.clear(); 780 } 781 782 783 private void startEventHandlerThread() { 784 if(event_queue == null) 785 event_queue=new Queue(); 786 if(evt_thread == null) { 787 evt_thread=new Thread (this, "GMS.EventHandlerThread"); 788 evt_thread.setDaemon(true); 789 evt_thread.start(); 790 } 791 } 792 793 794 private void stopEventHandlerThread() { 795 if(evt_thread != null) { 796 event_queue.close(false); 797 event_queue=null; 798 evt_thread=null; 799 return; 800 } 801 802 if(event_queue != null) { 803 event_queue.close(false); 804 event_queue=null; 805 } 806 } 807 808 809 } 810 | Popular Tags |