1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.blocks.GroupRequest; 8 import org.jgroups.blocks.MethodCall; 9 import org.jgroups.stack.RpcProtocol; 10 import org.jgroups.util.TimeScheduler; 11 import org.jgroups.util.Util; 12 13 import java.util.Properties ; 14 import java.util.Vector ; 15 16 17 51 public class STABLE extends RpcProtocol { 52 53 private static final String PROT_NAME="STABLE"; 54 55 56 private static final double SUBSET_SIZE=0.1; 57 58 59 private static final int GOSSIP_MSG_INTERVAL=100; 60 61 62 private static final int GOSSIP_INTERVAL=10000; 63 64 private Address local_addr=null; 65 private ViewId vid=null; 66 private final Vector mbrs=new Vector (11); 67 68 69 private long round=1; 70 71 72 private long[] seqnos=new long[0]; 73 74 75 private boolean[] heard_from=new boolean[0]; 76 77 78 private double subset=SUBSET_SIZE; 79 80 81 private TimeScheduler sched=null; 82 83 private Task gossip_task; 84 85 86 private int max_msgs=GOSSIP_MSG_INTERVAL; 87 88 89 private long max_wait_time=GOSSIP_INTERVAL; 90 91 92 private long num_msgs=max_msgs; 93 94 95 private final Object highest_seqnos_mutex=new Object (); 96 97 98 private long highest_seqnos_timeout=4000; 99 100 101 104 public String getName() { 105 return (PROT_NAME); 106 } 107 108 109 119 public Vector requiredUpServices() { 120 Vector retval=new Vector (1); 121 retval.addElement(new Integer (Event.GET_MSGS_RECEIVED)); 122 return retval; 123 } 124 125 146 public boolean setProperties(Properties props) { 147 String str; 148 149 super.setProperties(props); 150 str=props.getProperty("subset"); 151 if(str != null) { 152 subset=Float.parseFloat(str); 153 props.remove("subset"); 154 } 155 156 str=props.getProperty("max_msgs"); 157 if(str != null) { 158 num_msgs=max_msgs=Integer.parseInt(str); 159 if(max_msgs <= 1) { 160 if(log.isFatalEnabled()) log.fatal("value for 'max_msgs' must be greater than 1 !"); 161 return false; 162 } 163 props.remove("max_msgs"); 164 } 165 166 str=props.getProperty("max_wait_time"); 167 if(str != null) { 168 max_wait_time=Long.parseLong(str); 169 props.remove("max_wait_time"); 170 } 171 172 str=props.getProperty("highest_seqnos_timeout"); 173 if(str != null) { 174 highest_seqnos_timeout=Long.parseLong(str); 175 props.remove("highest_seqnos_timeout"); 176 } 177 178 if(props.size() > 0) { 179 System.err.println("STABLE.setProperties(): these properties " + 180 "are not recognized:"); 181 props.list(System.out); 182 return false; 183 } 184 return true; 185 } 186 187 188 194 public void start() throws Exception { 195 TimeScheduler timer; 196 197 super.start(); 198 timer=stack != null ? stack.timer : null; 199 if(timer == null) 200 throw new Exception ("STABLE.start(): timer is null"); 201 202 sched=timer; 203 204 if(_corr != null) 206 _corr.setDeadlockDetection(false); 207 initialize(); 208 startGossip(); 209 } 210 211 212 215 public void stop() { 216 super.stop(); 217 synchronized(this) { 218 if(gossip_task != null) 219 gossip_task.cancel(); 220 gossip_task=null; 221 } 222 } 223 224 225 226 227 249 public void gossip(ViewId view_id, long gossip_round, 250 long[] gossip_seqnos, boolean[] heard, Object sender) { 251 Object [] params; 252 MethodCall call; 253 254 synchronized(this) { 255 256 if(log.isInfoEnabled()) log.info("sender=" + sender + ", round=" + gossip_round + ", seqnos=" + 257 Util.array2String(gossip_seqnos) + ", heard=" + 258 Util.array2String(heard)); 259 if(vid == null || view_id == null || !vid.equals(view_id)) { 260 261 if(log.isInfoEnabled()) log.info("view ID s are different (" + vid + " != " + view_id + 262 "). Discarding gossip received"); 263 return; 264 } 265 if(gossip_round < this.round) { 266 267 if(log.isInfoEnabled()) log.info("received a gossip from a previous round (" + 268 gossip_round + "); my round is " + round + 269 ". Discarding gossip"); 270 return; 271 } 272 if(gossip_seqnos == null || seqnos == null || 273 seqnos.length != gossip_seqnos.length) { 274 275 if(log.isWarnEnabled()) log.warn("size of seqnos and gossip_seqnos are not equal ! " + 276 "Discarding gossip"); 277 return; 278 } 279 280 if(round == gossip_round) { 288 update(sender, gossip_seqnos, heard); 289 } 290 else if(round < gossip_round) { 291 292 if(log.isInfoEnabled()) log.info("received a gossip from a higher round (" + 293 gossip_round + "); adopting my round (" + round + 294 ") to " + gossip_round); 295 round=gossip_round; 296 set(sender, gossip_seqnos, heard_from); 297 } 298 299 if(log.isInfoEnabled()) log.info("heard_from=" + Util.array2String(heard_from)); 300 if(!heardFromAll()) 301 return; 302 303 params=new Object []{ 304 vid.clone(), 305 new Long (gossip_round), 306 seqnos.clone(), 307 local_addr}; 308 } 310 call=new MethodCall("stability", params, 311 new String [] {ViewId.class.getName(), long.class.getName(), long[].class.getName(), Object .class.getName()}); 312 callRemoteMethods(null, call, GroupRequest.GET_NONE, 0); 313 } 314 315 316 320 public void stability(ViewId view_id, long gossip_round, long[] stability_vector, Object sender) { 321 synchronized(this) { 325 326 if(log.isInfoEnabled()) log.info("sender=" + sender + ", round=" + gossip_round + ", vector=" + 327 Util.array2String(stability_vector) + ')'); 328 if(vid == null || view_id == null || !vid.equals(view_id)) { 329 330 if(log.isInfoEnabled()) log.info("view ID s are different (" + vid + " != " + view_id + 331 "). Discarding gossip received"); 332 return; 333 } 334 335 if(round > gossip_round) 336 return; 337 round=gossip_round + 1; 338 for(int i=0; i < heard_from.length; i++) 339 heard_from[i]=false; 340 } 341 heard_from[mbrs.indexOf(local_addr)]=true; 342 343 passUp(new Event(Event.STABLE, stability_vector)); 344 getHighestSeqnos(); 345 } 346 347 348 349 358 public boolean handleUpEvent(Event evt) { 359 switch(evt.getType()) { 360 case Event.MSG: 361 if(!upMsg(evt)) 362 return (false); 363 break; 364 case Event.SET_LOCAL_ADDRESS: 365 local_addr=(Address)evt.getArg(); 366 break; 367 } 368 369 return true; 370 } 371 372 373 382 public boolean handleDownEvent(Event evt) { 383 switch(evt.getType()) { 384 case Event.VIEW_CHANGE: 385 if(!downViewChange(evt)) 386 return (false); 387 break; 388 case Event.GET_MSGS_RECEIVED_OK: 390 if(!downGetMsgsReceived(evt)) 391 return (false); 392 break; 393 } 394 395 return (true); 396 } 397 398 399 402 private void gossipRun() { 403 num_msgs=max_msgs; 404 sendGossip(); 405 } 406 407 408 415 private void initialize() { 416 synchronized(this) { 417 seqnos=new long[mbrs.size()]; 418 for(int i=0; i < seqnos.length; i++) 419 seqnos[i]=-1; 420 421 heard_from=new boolean[mbrs.size()]; 422 for(int i=0; i < heard_from.length; i++) 423 heard_from[i]=false; 424 } 425 } 426 427 428 448 private void update(Object sender, long[] gossip_seqnos, 449 boolean[] gossip_heard_from) { 450 int index; 451 452 synchronized(this) { 453 index=mbrs.indexOf(sender); 454 if(index < 0) { 455 if(log.isWarnEnabled()) log.warn("sender " + sender + " not found in mbrs !"); 456 return; 457 } 458 459 for(int i=0; i < gossip_seqnos.length; i++) 460 seqnos[i]=Math.min(seqnos[i], gossip_seqnos[i]); 461 462 heard_from[index]=true; 463 for(int i=0; i < heard_from.length; i++) 464 heard_from[i]=heard_from[i] | gossip_heard_from[i]; 465 } 466 } 467 468 469 484 private void set(Object sender, long[] gossip_seqnos, 485 boolean[] gossip_heard_from) { 486 int index; 487 488 synchronized(this) { 489 index=mbrs.indexOf(sender); 490 if(index < 0) { 491 if(log.isWarnEnabled()) log.warn("sender " + sender + " not found in mbrs !"); 492 return; 493 } 494 495 seqnos=gossip_seqnos; 496 heard_from=gossip_heard_from; 497 } 498 } 499 500 501 505 private boolean heardFromAll() { 506 synchronized(this) { 507 if(heard_from == null) return false; 508 for(int i=0; i < heard_from.length; i++) 509 if(!heard_from[i]) 510 return false; 511 } 512 513 return true; 514 } 515 516 517 520 private void sendGossip() { 521 Vector gossip_subset; 522 Object [] params; 523 MethodCall call; 524 525 synchronized(this) { 526 gossip_subset=Util.pickSubset(mbrs, subset); 527 if(gossip_subset == null || gossip_subset.size() < 1) { 528 if(log.isWarnEnabled()) log.warn("picked empty subset !"); 529 return; 530 } 531 532 533 if(log.isInfoEnabled()) log.info("subset=" + gossip_subset + ", round=" + round + ", seqnos=" + 534 Util.array2String(seqnos)); 535 536 params=new Object []{ 537 vid.clone(), 538 new Long (round), 539 seqnos.clone(), 540 heard_from.clone(), 541 local_addr}; 542 } 543 544 call=new MethodCall("gossip", params, 545 new String [] {ViewId.class.getName(), long.class.getName(), long[].class.getName(), boolean[].class.getName(), Object .class.getName()}); 546 for(int i=0; i < gossip_subset.size(); i++) { 547 try { 548 callRemoteMethod((Address)gossip_subset.get(i), call, GroupRequest.GET_NONE, 0); 549 } 550 catch(Exception e) { 551 if(log.isDebugEnabled()) log.debug("exception=" + e); 552 } 553 } 554 } 555 556 557 562 private void getHighestSeqnos() { 563 synchronized(highest_seqnos_mutex) { 564 passUp(new Event(Event.GET_MSGS_RECEIVED)); 565 566 try { 567 highest_seqnos_mutex.wait(highest_seqnos_timeout); 568 } 569 catch(InterruptedException e) { 570 571 if(log.isErrorEnabled()) log.error("Interrupted while waiting for highest seqnos from NAKACK"); 572 } 573 } 574 } 575 576 577 580 private void startGossip() { 581 synchronized(this) { 582 if(gossip_task != null) 583 gossip_task.cancel(); 584 gossip_task=new Task(new Times(new long[]{GOSSIP_INTERVAL})); 585 sched.add(gossip_task); 586 } 587 } 588 589 590 599 private boolean upMsg(Event e) { 600 Message msg=(Message)e.getArg(); 601 602 if(msg.getDest() != null && (!msg.getDest().isMulticastAddress())) 603 return (true); 604 605 synchronized(this) { 606 --num_msgs; 607 if(num_msgs > 0) 608 return (true); 609 num_msgs=max_msgs; 610 611 gossip_task.cancel(); 612 gossip_task=new Task(new Times(new long[]{0, GOSSIP_INTERVAL})); 613 sched.add(gossip_task); 614 } 615 616 return (true); 617 } 618 619 620 629 private boolean downViewChange(Event e) { 630 View v=(View)e.getArg(); 631 Vector new_mbrs=v.getMembers(); 632 633 642 643 synchronized(this) { 644 vid=v.getVid(); 645 mbrs.clear(); 646 mbrs.addAll(new_mbrs); 647 initialize(); 648 } 649 650 return (true); 651 } 652 653 654 662 private boolean downGetMsgsReceived(Event e) { 663 long[] new_seqnos=(long[])e.getArg(); 664 665 try { 666 synchronized(this) { 667 if(new_seqnos == null) 668 return (true); 669 if(new_seqnos.length != seqnos.length) { 670 671 if(log.isInfoEnabled()) log.info("GET_MSGS_RECEIVED: array of highest " + 672 "seqnos seen so far (received from NAKACK layer) " + 673 "has a different length (" + new_seqnos.length + 674 ") from 'seqnos' array (" + seqnos.length + ')'); 675 return (true); 676 } 677 System.arraycopy(new_seqnos, 0, seqnos, 0, seqnos.length); 678 } 679 680 } 681 finally { 682 synchronized(highest_seqnos_mutex) { 683 highest_seqnos_mutex.notifyAll(); 684 } 685 } 686 687 return (true); 688 } 689 690 691 696 private static class Times { 697 private int next=0; 698 private long[] times; 699 700 public Times(long[] times) { 701 if(times.length == 0) 702 throw new IllegalArgumentException ("times"); 703 this.times=times; 704 } 705 706 public synchronized long next() { 707 if(next >= times.length) 708 return (times[times.length - 1]); 709 else 710 return (times[next++]); 711 } 712 713 public long[] times() { 714 return (times); 715 } 716 717 public synchronized void reset() { 718 next=0; 719 } 720 } 721 722 723 726 private class Task implements TimeScheduler.Task { 727 private final Times intervals; 728 private boolean cancelled=false; 729 730 public Task(Times intervals) { 731 this.intervals=intervals; 732 } 733 734 public long nextInterval() { 735 return (intervals.next()); 736 } 737 738 public boolean cancelled() { 739 return (cancelled); 740 } 741 742 public void cancel() { 743 cancelled=true; 744 } 745 746 public void run() { 747 gossipRun(); 748 } 749 } 750 } 751 | Popular Tags |