1 3 package org.jgroups.protocols.pbcast; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Message; 8 import org.jgroups.View; 9 import org.jgroups.stack.NakReceiverWindow; 10 import org.jgroups.stack.Protocol; 11 import org.jgroups.util.List; 12 import org.jgroups.util.Queue; 13 import org.jgroups.util.QueueClosedException; 14 import org.jgroups.util.Util; 15 16 import java.util.Enumeration ; 17 import java.util.Hashtable ; 18 import java.util.Properties ; 19 import java.util.Vector ; 20 21 22 29 public class PBCAST extends Protocol implements Runnable { 30 boolean operational=false; 31 long seqno=1; long gossip_round=1; Address local_addr=null; 34 final Hashtable digest=new Hashtable (); Thread gossip_thread=null; 36 GossipHandler gossip_handler=null; final Queue gossip_queue=new Queue(); int max_queue=100; long gossip_interval=5000; double subset=0.1; long desired_avg_gossip=30000; final Vector members=new Vector (); 43 final List gossip_list=new List(); int max_gossip_cache=100; int gc_lag=30; final Hashtable invalid_gossipers=new Hashtable (); final int max_invalid_gossips=2; Vector seen_list=null; 49 boolean shun=false; boolean dynamic=true; boolean skip_sleep=true; 52 boolean mcast_gossip=true; 54 55 public String getName() { 56 return "PBCAST"; 57 } 58 59 60 public Vector providedUpServices() { 61 Vector retval=new Vector (); 62 retval.addElement(new Integer (Event.GET_DIGEST)); 63 retval.addElement(new Integer (Event.SET_DIGEST)); 64 retval.addElement(new Integer (Event.GET_DIGEST_STATE)); 65 return retval; 66 } 67 68 69 public void stop() { 70 stopGossipThread(); 71 stopGossipHandler(); 72 operational=false; 73 } 74 75 76 public void up(Event evt) { 77 Message m; 78 PbcastHeader hdr; 79 Address sender=null; 80 81 switch(evt.getType()) { 82 case Event.MSG: 83 m=(Message) evt.getArg(); 84 if(m.getDest() != null && !m.getDest().isMulticastAddress()) { 85 if(!(m.getHeader(getName()) instanceof PbcastHeader)) 86 break; } 88 89 if(!operational) { 91 92 if(log.isInfoEnabled()) log.info("event was discarded as I'm not yet operational. Event: " + 93 Util.printEvent(evt)); 94 return; } 96 97 if(m.getHeader(getName()) instanceof PbcastHeader) 98 hdr=(PbcastHeader) m.removeHeader(getName()); 99 else { 100 sender=m.getSrc(); 101 102 if(log.isErrorEnabled()) log.error("PbcastHeader expected, but received header of type " + 103 m.getHeader(getName()).getClass().getName() + " from " + sender + 104 ". Passing event up unchanged"); 105 break; 106 } 107 108 switch(hdr.type) { 109 case PbcastHeader.MCAST_MSG: handleUpMessage(m, hdr); 111 return; 112 113 case PbcastHeader.GOSSIP: 116 case PbcastHeader.XMIT_REQ: 117 case PbcastHeader.XMIT_RSP: 118 case PbcastHeader.NOT_MEMBER: 119 try { 120 if(gossip_queue.size() >= max_queue) { 121 122 if(log.isWarnEnabled()) log.warn("gossip request " + 123 PbcastHeader.type2String(hdr.type) + " discarded because " + 124 "gossip_queue is full (number of elements=" + gossip_queue.size() + ')'); 125 return; 126 } 127 gossip_queue.add(new GossipEntry(hdr, m.getSrc(), m.getBuffer())); 128 } 129 catch(Exception ex) { 130 if(log.isWarnEnabled()) log.warn("exception adding request to gossip_queue, details=" + ex); 131 } 132 return; 133 134 default: 135 if(log.isErrorEnabled()) log.error("type (" + hdr.type + ") of PbcastHeader not known !"); 136 return; 137 } 138 139 case Event.SET_LOCAL_ADDRESS: 140 local_addr=(Address) evt.getArg(); 141 break; } 143 144 passUp(evt); } 146 147 148 public void down(Event evt) { 149 PbcastHeader hdr; 150 Message m, copy; 151 View v; 152 Vector mbrs; 153 Address key; 154 NakReceiverWindow win; 155 156 157 switch(evt.getType()) { 158 159 case Event.MSG: 160 m=(Message) evt.getArg(); 161 if(m.getDest() != null && !m.getDest().isMulticastAddress()) { 162 break; } 164 else { hdr=new PbcastHeader(PbcastHeader.MCAST_MSG, seqno); 166 m.putHeader(getName(), hdr); 167 168 synchronized(digest) { 170 win=(NakReceiverWindow) digest.get(local_addr); 171 if(win == null) { 172 if(log.isInfoEnabled()) log.info("NakReceiverWindow for sender " + local_addr + 173 " not found. Creating new NakReceiverWindow starting at seqno=" + seqno); 174 win=new NakReceiverWindow(local_addr, seqno); 175 digest.put(local_addr, win); 176 } 177 copy=m.copy(); 178 copy.setSrc(local_addr); 179 win.add(seqno, copy); 180 } 181 seqno++; 182 break; 183 } 184 185 case Event.SET_DIGEST: 186 setDigest((Digest) evt.getArg()); 187 return; 189 case Event.GET_DIGEST: passUp(new Event(Event.GET_DIGEST_OK, getDigest())); 191 return; 192 193 case Event.GET_DIGEST_STATE: passUp(new Event(Event.GET_DIGEST_STATE_OK, getDigest())); 195 return; 196 197 case Event.VIEW_CHANGE: 198 v=(View) evt.getArg(); 199 if(v == null) { 200 if(log.isErrorEnabled()) log.error("view is null !"); 201 break; 202 } 203 mbrs=v.getMembers(); 204 205 synchronized(members) { 207 members.removeAllElements(); 208 for(int i=0; i < mbrs.size(); i++) 209 members.addElement(mbrs.elementAt(i)); 210 } 211 212 if(mbrs.size() > 0) { 214 synchronized(digest) { 215 for(Enumeration e=digest.keys(); e.hasMoreElements();) { 216 key=(Address) e.nextElement(); 217 if(!mbrs.contains(key)) { 218 win=(NakReceiverWindow) digest.get(key); 219 win.reset(); 220 digest.remove(key); 221 } 222 } 223 } 224 } 225 226 for(int i=0; i < mbrs.size(); i++) { 228 key=(Address) mbrs.elementAt(i); 229 if(!digest.containsKey(key)) { 230 digest.put(key, new NakReceiverWindow(key, 1)); 231 } 232 } 233 234 if(dynamic) { 235 gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip); 236 237 if(log.isInfoEnabled()) log.info("VIEW_CHANGE: gossip_interval=" + gossip_interval); 238 if(gossip_thread != null) { 239 skip_sleep=true; 240 gossip_thread.interrupt(); } 242 } 243 244 startGossipThread(); startGossipHandler(); 246 break; 247 248 case Event.BECOME_SERVER: 249 operational=true; 250 break; 251 } 252 253 passDown(evt); 254 } 255 256 257 258 public void run() { 259 while(gossip_thread != null) { if(dynamic) { 261 gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip); 262 263 if(log.isInfoEnabled()) log.info("gossip_interval=" + gossip_interval); 264 } 265 266 Util.sleep(gossip_interval); 267 if(skip_sleep) 268 skip_sleep=false; 269 else 270 sendGossip(); 271 } 272 } 273 274 275 276 public boolean setProperties(Properties props) {super.setProperties(props); 277 String str; 278 279 str=props.getProperty("dynamic"); 280 if(str != null) { 281 dynamic=Boolean.valueOf(str).booleanValue(); 282 props.remove("dynamic"); 283 } 284 285 str=props.getProperty("shun"); 286 if(str != null) { 287 shun=Boolean.valueOf(str).booleanValue(); 288 props.remove("shun"); 289 } 290 291 str=props.getProperty("gossip_interval"); 292 if(str != null) { 293 gossip_interval=Long.parseLong(str); 294 props.remove("gossip_interval"); 295 } 296 297 str=props.getProperty("mcast_gossip"); 298 if(str != null) { 299 mcast_gossip=Boolean.valueOf(str).booleanValue(); 300 props.remove("mcast_gossip"); 301 } 302 303 str=props.getProperty("subset"); 304 if(str != null) { 305 subset=Double.parseDouble(str); 306 props.remove("subset"); 307 } 308 309 str=props.getProperty("desired_avg_gossip"); 310 if(str != null) { 311 desired_avg_gossip=Long.parseLong(str); 312 props.remove("desired_avg_gossip"); 313 } 314 315 str=props.getProperty("max_queue"); 316 if(str != null) { 317 max_queue=Integer.parseInt(str); 318 props.remove("max_queue"); 319 } 320 321 str=props.getProperty("max_gossip_cache"); 322 if(str != null) { 323 max_gossip_cache=Integer.parseInt(str); 324 props.remove("max_gossip_cache"); 325 } 326 327 str=props.getProperty("gc_lag"); 328 if(str != null) { 329 gc_lag=Integer.parseInt(str); 330 props.remove("gc_lag"); 331 } 332 333 if(props.size() > 0) { 334 System.err.println("PBCAST.setProperties(): the following properties are not recognized:"); 335 props.list(System.out); 336 return false; 337 } 338 return true; 339 } 340 341 342 343 344 345 346 351 void handleUpMessage(Message m, PbcastHeader hdr) { 352 Address sender=m.getSrc(); 353 NakReceiverWindow win=null; 354 Message tmpmsg; 355 long seqno=hdr.seqno; 356 357 if(sender == null) { 358 if(log.isErrorEnabled()) log.error("sender is null"); 359 return; 360 } 361 362 synchronized(digest) { 363 win=(NakReceiverWindow) digest.get(sender); 364 if(win == null) { 365 if(log.isWarnEnabled()) log.warn("NakReceiverWindow for sender " + sender + 366 " not found. Creating new NakReceiverWindow starting at seqno=" + seqno); 367 win=new NakReceiverWindow(sender, seqno); 368 digest.put(sender, win); 369 } 370 371 m.putHeader(getName(), hdr); 377 win.add(seqno, m); 378 379 380 if(log.isInfoEnabled()) log.info("receiver window for " + sender + " is " + win); 381 382 while((tmpmsg=win.remove()) != null) { 384 tmpmsg.removeHeader(getName()); passUp(new Event(Event.MSG, tmpmsg)); 386 } 387 388 389 if(members.size() == 1) { 392 seqno=Math.max(seqno - gc_lag, 0); 393 if(seqno <= 0) { 394 } 395 else { 396 397 if(log.isInfoEnabled()) log.info("deleting messages < " + 398 seqno + " from " + sender); 399 win.stable(seqno); 400 } 401 } 402 } 403 } 404 405 406 412 Digest getDigest() { 413 Digest ret=new Digest(digest.size()); 414 long highest_seqno, lowest_seqno; 415 Address key; 416 NakReceiverWindow win; 417 418 for(Enumeration e=digest.keys(); e.hasMoreElements();) { 419 key=(Address) e.nextElement(); 420 win=(NakReceiverWindow) digest.get(key); 421 lowest_seqno=win.getLowestSeen(); 422 highest_seqno=win.getHighestSeen(); 423 ret.add(key, lowest_seqno, highest_seqno); 424 } 425 426 if(log.isInfoEnabled()) log.info("digest is " + ret); 427 428 return ret; 429 } 430 431 432 436 void setDigest(Digest d) { 437 NakReceiverWindow win; 438 Address sender; 439 long seqno=1; 440 441 synchronized(digest) { 442 for(Enumeration e=digest.elements(); e.hasMoreElements();) { 443 win=(NakReceiverWindow) e.nextElement(); 444 win.reset(); 445 } 446 digest.clear(); 447 for(int i=0; i < d.size(); i++) { 448 sender=d.senderAt(i); 449 seqno=d.highSeqnoAt(i); 450 if(sender == null) { 451 if(log.isErrorEnabled()) log.error("cannot set item because sender is null"); 452 continue; 453 } 454 digest.put(sender, new NakReceiverWindow(sender, seqno + 1)); } 456 457 } 458 } 459 460 461 String printDigest() { 462 long highest_seqno; 463 Address key; 464 NakReceiverWindow win; 465 StringBuffer sb=new StringBuffer (); 466 467 for(Enumeration e=digest.keys(); e.hasMoreElements();) { 468 key=(Address) e.nextElement(); 469 win=(NakReceiverWindow) digest.get(key); 470 highest_seqno=win.getHighestSeen(); 471 sb.append(key + ": " + highest_seqno + '\n'); 472 } 473 return sb.toString(); 474 } 475 476 477 String printIncomingMessageQueue() { 478 StringBuffer sb=new StringBuffer (); 479 NakReceiverWindow win; 480 481 win=(NakReceiverWindow) digest.get(local_addr); 482 sb.append(win); 483 return sb.toString(); 484 } 485 486 487 void startGossipThread() { 488 if(gossip_thread == null) { 489 gossip_thread=new Thread (this); 490 gossip_thread.setDaemon(true); 491 gossip_thread.start(); 492 } 493 } 494 495 496 void stopGossipThread() { 497 Thread tmp; 498 499 if(gossip_thread != null) { 500 if(gossip_thread.isAlive()) { 501 tmp=gossip_thread; 502 gossip_thread=null; 503 tmp.interrupt(); 504 tmp=null; 505 } 506 } 507 gossip_thread=null; 508 } 509 510 511 void startGossipHandler() { 512 if(gossip_handler == null) { 513 gossip_handler=new GossipHandler(gossip_queue); 514 gossip_handler.start(); 515 } 516 } 517 518 void stopGossipHandler() { 519 if(gossip_handler != null) { 520 gossip_handler.stop(); 521 gossip_handler=null; 522 } 523 } 524 525 526 530 void sendGossip() { 531 Vector current_mbrs=(Vector ) members.clone(); 532 Vector subset_mbrs=null; 533 Gossip gossip=null; 534 Message msg; 535 Address dest; 536 PbcastHeader hdr; 537 538 539 if(local_addr != null) 540 current_mbrs.remove(local_addr); 542 if(mcast_gossip) { gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), null); for(int i=0; i < current_mbrs.size(); i++) gossip.addToSeenList((Address) current_mbrs.elementAt(i)); 546 hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP); 547 msg=new Message(); msg.putHeader(getName(), hdr); 549 550 551 if(log.isInfoEnabled()) log.info("(from " + local_addr + 552 ") multicasting gossip " + gossip.shortForm() + " to all members"); 553 554 passDown(new Event(Event.MSG, msg)); 555 } 556 else { 557 subset_mbrs=Util.pickSubset(current_mbrs, subset); 558 559 for(int i=0; i < subset_mbrs.size(); i++) { 560 gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), (Vector ) current_mbrs.clone()); 561 gossip.addToSeenList(local_addr); 562 hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP); 563 dest=(Address) subset_mbrs.elementAt(i); 564 msg=new Message(dest, null, null); 565 msg.putHeader(getName(), hdr); 566 567 568 if(log.isInfoEnabled()) log.info("(from " + local_addr + 569 ") sending gossip " + gossip.shortForm() + " to " + subset_mbrs); 570 571 passDown(new Event(Event.MSG, msg)); 572 } 573 } 574 575 gossip_round++; 576 } 577 578 579 587 void handleGossip(Gossip gossip) { 588 long my_low=0, my_high=0, their_low, their_high; 589 Hashtable ht=null; 590 Digest their_digest; 591 Address sender=null; 592 NakReceiverWindow win; 593 Message msg; 594 Address dest; 595 Vector new_dests; 596 PbcastHeader hdr; 597 List missing_msgs; 599 600 601 if(log.isInfoEnabled()) log.info("(from " + local_addr + 602 ") received gossip " + gossip.shortForm() + " from " + gossip.sender); 603 604 605 if(gossip == null || gossip.digest == null) { 606 if(log.isWarnEnabled()) log.warn("gossip is null or digest is null"); 607 return; 608 } 609 610 611 612 if(gossip.sender == null) { 613 if(log.isErrorEnabled()) log.error("sender of gossip is null; " + 614 "don't know where to send XMIT_REQ to. Discarding gossip"); 615 return; 616 } 617 618 619 622 if(!members.contains(gossip.sender)) { 623 if(log.isWarnEnabled()) log.warn("sender " + gossip.sender + 624 " is not a member. Gossip will not be processed"); 625 if(shun) 626 shunInvalidGossiper(gossip.sender); 627 return; 628 } 629 630 631 633 while(gossip_list.size() >= max_gossip_cache) gossip_list.removeFromHead(); 635 636 if(gossip_list.contains(gossip)) return; 638 else 639 gossip_list.add(gossip.copy()); 641 642 643 645 seen_list=gossip.getSeenList(); 646 if(seen_list.size() > 0) 647 passDown(new Event(Event.HEARD_FROM, seen_list.clone())); 648 649 650 651 653 their_digest=gossip.digest; 654 for(int i=0; i < their_digest.size(); i++) { 655 sender=their_digest.senderAt(i); 656 their_low=their_digest.lowSeqnoAt(i); 657 their_high=their_digest.highSeqnoAt(i); 658 659 if(their_low == 0 && their_high == 0) 660 continue; 662 win=(NakReceiverWindow) digest.get(sender); 663 if(win == null) { 664 668 if(log.isWarnEnabled()) log.warn("sender " + sender + " not found, skipping..."); 669 continue; 670 } 671 672 my_low=win.getLowestSeen(); 673 my_high=win.getHighestSeen(); 674 if(my_high < their_high) { 675 if(my_low + 1 < their_low) { 677 ; 678 } 679 else { 680 missing_msgs=win.getMissingMessages(my_high, their_high); 681 if(missing_msgs != null) { 682 if(log.isInfoEnabled()) 683 log.info("asking " + gossip.sender + " for retransmission of " + 684 sender + ", missing messages: " + missing_msgs + "\nwin for " + sender + ":\n" + win + '\n'); 685 if(ht == null) ht=new Hashtable (); 686 ht.put(sender, missing_msgs); 687 } 688 } 689 } 690 } 691 692 693 694 696 if(ht == null || ht.size() == 0) { 697 } 698 else { 699 hdr=new PbcastHeader(PbcastHeader.XMIT_REQ); 700 hdr.xmit_reqs=ht; 701 702 if(log.isInfoEnabled()) log.info("sending XMIT_REQ to " + gossip.sender); 703 msg=new Message(gossip.sender, null, null); 704 msg.putHeader(getName(), hdr); 705 passDown(new Event(Event.MSG, msg)); 706 } 707 708 709 710 712 gossip.removeFromNotSeenList(local_addr); 713 if(gossip.sizeOfNotSeenList() == 0) { 714 garbageCollect(gossip.digest); 715 return; 716 } 717 718 719 720 721 new_dests=Util.pickSubset(gossip.getNotSeenList(), subset); 722 723 724 if(log.isInfoEnabled()) log.info("(from " + local_addr + 725 ") forwarding gossip " + gossip.shortForm() + " to " + new_dests); 726 gossip.addToSeenList(local_addr); 727 for(int i=0; i < new_dests.size(); i++) { 728 dest=(Address) new_dests.elementAt(i); 729 msg=new Message(dest, null, null); 730 hdr=new PbcastHeader(gossip.copy(), PbcastHeader.GOSSIP); 731 msg.putHeader(getName(), hdr); 732 passDown(new Event(Event.MSG, msg)); 733 } 734 } 735 736 737 741 void handleXmitRequest(Address requester, Hashtable xmit_reqs) { 742 NakReceiverWindow win; 743 Address sender; 744 List msgs, missing_msgs, xmit_msgs; 745 Message msg; 746 747 if(requester == null) { 748 if(log.isErrorEnabled()) log.error("requester is null"); 749 return; 750 } 751 752 { 753 if(log.isInfoEnabled()) log.info("retransmission requests are " + printXmitReqs(xmit_reqs)); 754 } 755 for(Enumeration e=xmit_reqs.keys(); e.hasMoreElements();) { 756 sender=(Address) e.nextElement(); 757 win=(NakReceiverWindow) digest.get(sender); 758 if(win == null) { 759 if(log.isWarnEnabled()) log.warn("sender " + sender + 760 " not found in my digest; skipping retransmit request !"); 761 continue; 762 } 763 764 missing_msgs=(List) xmit_reqs.get(sender); 765 msgs=win.getMessagesInList(missing_msgs); 767 768 769 xmit_msgs=new List(); 772 for(Enumeration en=msgs.elements(); en.hasMoreElements();) { 773 msg=((Message) en.nextElement()).copy(); 774 xmit_msgs.add(msg); 775 } 776 777 msg=new Message(requester, null, xmit_msgs); 779 msg.putHeader(getName(), new PbcastHeader(PbcastHeader.XMIT_RSP)); 780 passDown(new Event(Event.MSG, msg)); 781 } 782 } 783 784 785 void handleXmitRsp(List xmit_msgs) { 786 Message m; 787 PbcastHeader hdr; 788 789 for(Enumeration e=xmit_msgs.elements(); e.hasMoreElements();) { 790 m=(Message) e.nextElement(); 791 hdr=(PbcastHeader) m.removeHeader(getName()); 792 if(hdr == null) { 793 log.warn("header is null, ignoring message"); 794 } 795 else { 796 if(log.isInfoEnabled()) log.info("received #" + hdr.seqno + ", type=" + 797 PbcastHeader.type2String(hdr.type) + ", msg=" + m); 798 handleUpMessage(m, hdr); 799 } 800 } 801 } 802 803 804 String printXmitReqs(Hashtable xmit_reqs) { 805 StringBuffer sb=new StringBuffer (); 806 Address key; 807 boolean first=true; 808 809 if(xmit_reqs == null) 810 return "<null>"; 811 812 for(Enumeration e=xmit_reqs.keys(); e.hasMoreElements();) { 813 key=(Address) e.nextElement(); 814 if(!first) { 815 sb.append(", "); 816 } 817 else 818 first=false; 819 sb.append(key + ": " + xmit_reqs.get(key)); 820 } 821 return sb.toString(); 822 } 823 824 825 void garbageCollect(Digest gc) { 826 Address sender; 827 long seqno; 828 NakReceiverWindow win; 829 830 for(int i=0; i < gc.size(); i++) { 831 sender=gc.senderAt(i); 832 win=(NakReceiverWindow) digest.get(sender); 833 if(win == null) { 834 if(log.isDebugEnabled()) log.debug("sender " + sender + 835 " not found in our message digest, skipping"); 836 continue; 837 } 838 seqno=gc.highSeqnoAt(i); 839 seqno=Math.max(seqno - gc_lag, 0); 840 if(seqno <= 0) { 841 continue; 842 } 843 844 if(log.isInfoEnabled()) log.info("(from " + local_addr + 845 ") GC: deleting messages < " + seqno + " from " + sender); 846 win.stable(seqno); 847 } 848 } 849 850 851 855 void shunInvalidGossiper(Address invalid_gossiper) { 856 int num_pings=0; 857 Message shun_msg; 858 859 if(invalid_gossipers.containsKey(invalid_gossiper)) { 860 num_pings=((Integer ) invalid_gossipers.get(invalid_gossiper)).intValue(); 861 if(num_pings >= max_invalid_gossips) { 862 863 if(log.isInfoEnabled()) log.info("sender " + invalid_gossiper + 864 " is not member of " + members + " ! Telling it to leave group"); 865 shun_msg=new Message(invalid_gossiper, null, null); 866 shun_msg.putHeader(getName(), new PbcastHeader(PbcastHeader.NOT_MEMBER)); 867 passDown(new Event(Event.MSG, shun_msg)); 868 invalid_gossipers.remove(invalid_gossiper); 869 } 870 else { 871 num_pings++; 872 invalid_gossipers.put(invalid_gossiper, new Integer (num_pings)); 873 } 874 } 875 else { 876 num_pings++; 877 invalid_gossipers.put(invalid_gossiper, new Integer (num_pings)); 878 } 879 } 880 881 882 883 long computeGossipInterval(int num_mbrs, double desired_avg_gossip) { 884 return getRandom((long) (num_mbrs * desired_avg_gossip * 2)); 885 } 886 887 888 long getRandom(long range) { 889 return (long) ((Math.random() * range) % range); 890 } 891 892 893 894 895 896 private static class GossipEntry { 897 PbcastHeader hdr=null; 898 Address sender=null; 899 byte[] data=null; 900 901 GossipEntry(PbcastHeader hdr, Address sender, byte[] data) { 902 this.hdr=hdr; 903 this.sender=sender; 904 this.data=data; 905 } 906 907 public String toString() { 908 return "hdr=" + hdr + ", sender=" + sender + ", data=" + data; 909 } 910 } 911 912 913 916 private class GossipHandler implements Runnable { 917 Thread t=null; 918 final Queue gossip_queue; 919 920 921 GossipHandler(Queue q) { 922 gossip_queue=q; 923 } 924 925 926 void start() { 927 if(t == null) { 928 t=new Thread (this, "PBCAST.GossipHandlerThread"); 929 t.setDaemon(true); 930 t.start(); 931 } 932 } 933 934 935 void stop() { 936 Thread tmp; 937 if(t != null && t.isAlive()) { 938 tmp=t; 939 t=null; 940 if(gossip_queue != null) 941 gossip_queue.close(false); tmp.interrupt(); 943 } 944 t=null; 945 } 946 947 948 public void run() { 949 GossipEntry entry; 950 PbcastHeader hdr; 951 List xmit_msgs; 952 byte[] data; 953 954 while(t != null && gossip_queue != null) { 955 try { 956 entry=(GossipEntry) gossip_queue.remove(); 957 hdr=entry.hdr; 958 if(hdr == null) { 959 if(log.isErrorEnabled()) log.error("gossip entry has no PbcastHeader"); 960 continue; 961 } 962 963 switch(hdr.type) { 964 965 case PbcastHeader.GOSSIP: 966 handleGossip(hdr.gossip); 967 break; 968 969 case PbcastHeader.XMIT_REQ: 970 if(hdr.xmit_reqs == null) { 971 if(log.isWarnEnabled()) log.warn("request is null !"); 972 break; 973 } 974 handleXmitRequest(entry.sender, hdr.xmit_reqs); 975 break; 976 977 case PbcastHeader.XMIT_RSP: 978 data=entry.data; 979 if(data == null) { 980 if(log.isWarnEnabled()) log.warn("buffer is null (no xmitted msgs)"); 981 break; 982 } 983 try { 984 xmit_msgs=(List) Util.objectFromByteBuffer(data); 985 } 986 catch(Exception ex) { 987 if(log.isErrorEnabled()) log.error(ex.getMessage()); 988 break; 989 } 990 handleXmitRsp(xmit_msgs); 991 break; 992 993 case PbcastHeader.NOT_MEMBER: if(shun) { 995 if(log.isInfoEnabled()) log.info("I am being shunned. Will leave and re-join"); 996 passUp(new Event(Event.EXIT)); 997 } 998 break; 999 1000 default: 1001 if(log.isErrorEnabled()) log.error("type (" + hdr.type + 1002 ") of PbcastHeader not known !"); 1003 return; 1004 } 1005 } 1006 catch(QueueClosedException closed) { 1007 break; 1008 } 1009 } 1010 } 1011 } 1012 1013} 1014 1015 | Popular Tags |