1 3 package org.jgroups.blocks; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.*; 9 import org.jgroups.util.Util; 10 11 import java.io.Serializable ; 12 import java.util.StringTokenizer ; 13 import java.util.Vector ; 14 15 16 17 18 25 public class DistributedTree implements MessageListener, MembershipListener { 26 Node root=null; 27 final Vector listeners=new Vector (); 28 final Vector view_listeners=new Vector (); 29 final Vector members=new Vector (); 30 protected Channel channel=null; 31 protected RpcDispatcher disp=null; 32 String groupname="DistributedTreeGroup"; 33 String channel_properties="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=0):" + 34 "PING(timeout=5000;num_initial_members=6):" + 35 "FD_SOCK:" + 36 "VERIFY_SUSPECT(timeout=1500):" + 37 "pbcast.STABLE(desired_avg_gossip=10000):" + 38 "pbcast.NAKACK(gc_lag=5;retransmit_timeout=3000;trace=true):" + 39 "UNICAST(timeout=5000):" + 40 "FRAG(down_thread=false;up_thread=false):" + 41 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 42 "shun=false;print_local_addr=true):" + 43 "pbcast.STATE_TRANSFER(trace=true)"; 44 final long state_timeout=5000; 46 48 private boolean send_message = false; 49 50 protected static final Log log=LogFactory.getLog(DistributedTree.class); 51 52 53 54 public interface DistributedTreeListener { 55 void nodeAdded(String fqn, Serializable element); 56 57 void nodeRemoved(String fqn); 58 59 void nodeModified(String fqn, Serializable old_element, Serializable new_element); 60 } 61 62 63 public interface ViewListener { 64 void viewChange(Vector new_mbrs, Vector old_mbrs); 65 } 66 67 68 public DistributedTree() { 69 } 70 71 72 public DistributedTree(String groupname, String channel_properties) { 73 this.groupname=groupname; 74 if(channel_properties != null) 75 this.channel_properties=channel_properties; 76 } 77 78 90 public DistributedTree(PullPushAdapter adapter, Serializable id, long state_timeout) 91 throws ChannelException { 92 channel = (Channel)adapter.getTransport(); 93 disp=new RpcDispatcher(adapter, id, this, this, this); 94 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 95 boolean rc = channel.getState(null, state_timeout); 96 if(rc) { 97 if(log.isInfoEnabled()) log.info("state was retrieved successfully"); 98 } 99 else 100 if(log.isInfoEnabled()) log.info("state could not be retrieved (must be first member in group)"); 101 } 102 103 public Object getLocalAddress() { 104 return channel != null? channel.getLocalAddress() : null; 105 } 106 107 public void setDeadlockDetection(boolean flag) { 108 if(disp != null) 109 disp.setDeadlockDetection(flag); 110 } 111 112 public void start() throws Exception { 113 start(8000); 114 } 115 116 117 public void start(long timeout) throws Exception { 118 if(channel != null) return; 120 channel=new JChannel(channel_properties); 121 disp=new RpcDispatcher(channel, this, this, this); 122 channel.connect(groupname); 123 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 124 boolean rc=channel.getState(null, timeout); 125 if(rc) { 126 if(log.isInfoEnabled()) log.info("state was retrieved successfully"); 127 } 128 else 129 if(log.isInfoEnabled()) log.info("state could not be retrieved (must be first member in group)"); 130 } 131 132 133 public void stop() { 134 if(channel != null) { 135 channel.close(); 136 disp.stop(); 137 } 138 channel=null; 139 disp=null; 140 } 141 142 143 public void addDistributedTreeListener(DistributedTreeListener listener) { 144 if(!listeners.contains(listener)) 145 listeners.addElement(listener); 146 } 147 148 149 public void removeDistributedTreeListener(DistributedTreeListener listener) { 150 listeners.removeElement(listener); 151 } 152 153 154 public void addViewListener(ViewListener listener) { 155 if(!view_listeners.contains(listener)) 156 view_listeners.addElement(listener); 157 } 158 159 160 public void removeViewListener(ViewListener listener) { 161 view_listeners.removeElement(listener); 162 } 163 164 165 public void add(String fqn) { 166 if(send_message == true) { 169 try { 170 MethodCall call = new MethodCall("_add", new Object [] {fqn}, new String [] {String .class.getName()}); 171 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, 0); 172 } 173 catch(Exception ex) { 174 if(log.isErrorEnabled()) log.error("exception=" + ex); 175 } 176 } 177 else { 178 _add(fqn); 179 } 180 } 181 182 public void add(String fqn, Serializable element) { 183 add(fqn, element, 0); 184 } 185 186 188 public void reset(String fqn, Serializable element) 189 { 190 reset(fqn, element, 0); 191 } 192 193 public void remove(String fqn) { 194 remove(fqn, 0); 195 } 196 197 public void add(String fqn, Serializable element, int timeout) { 198 if(send_message == true) { 201 try { 202 MethodCall call = new MethodCall("_add", new Object [] {fqn, element}, 203 new String [] {String .class.getName(), Serializable .class.getName()}); 204 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout); 205 } 206 catch(Exception ex) { 207 if(log.isErrorEnabled()) log.error("exception=" + ex); 208 } 209 } 210 else { 211 _add(fqn, element); 212 } 213 } 214 215 217 public void reset(String fqn, Serializable element, int timeout) 218 { 219 if(send_message == true) { 222 try { 223 MethodCall call = new MethodCall("_reset", new Object [] {fqn, element}, 224 new String [] {String .class.getName(), Serializable .class.getName()}); 225 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout); 226 } 227 catch(Exception ex) { 228 if(log.isErrorEnabled()) log.error("exception=" + ex); 229 } 230 } 231 else { 232 _add(fqn, element); 233 } 234 } 235 236 public void remove(String fqn, int timeout) { 237 if(send_message == true) { 240 try { 241 MethodCall call = new MethodCall("_remove", new Object [] {fqn}, new String [] {String .class.getName()}); 242 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout); 243 } 244 catch(Exception ex) { 245 if(log.isErrorEnabled()) log.error("exception=" + ex); 246 } 247 } 248 else { 249 _remove(fqn); 250 } 251 } 252 253 254 public boolean exists(String fqn) { 255 if(fqn == null) 256 return false; 257 return findNode(fqn) == null? false : true; 258 } 259 260 261 public Serializable get(String fqn) { 262 Node n=null; 263 264 if(fqn == null) return null; 265 n=findNode(fqn); 266 if(n != null) { 267 return n.element; 268 } 269 return null; 270 } 271 272 273 public void set(String fqn, Serializable element) { 274 set(fqn, element, 0); 275 } 276 277 public void set(String fqn, Serializable element, int timeout) { 278 if(send_message == true) { 281 try { 282 MethodCall call = new MethodCall("_set", new Object [] {fqn, element}, 283 new String [] {String .class.getName(), Serializable .class.getName()}); 284 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout); 285 } 286 catch(Exception ex) { 287 if(log.isErrorEnabled()) log.error("exception=" + ex); 288 } 289 } 290 else { 291 _set(fqn, element); 292 } 293 } 294 295 296 297 public Vector getChildrenNames(String fqn) { 298 Vector ret=new Vector (); 299 Node n; 300 301 if(fqn == null) return ret; 302 n=findNode(fqn); 303 if(n == null || n.children == null) return ret; 304 for(int i=0; i < n.children.size(); i++) 305 ret.addElement(((Node)n.children.elementAt(i)).name); 306 return ret; 307 } 308 309 310 public String print() { 311 StringBuffer sb=new StringBuffer (); 312 int indent=0; 313 314 if(root == null) 315 return "/"; 316 317 sb.append(root.print(indent)); 318 return sb.toString(); 319 } 320 321 322 323 Vector getChildren(String fqn) { 324 Node n; 325 326 if(fqn == null) return null; 327 n=findNode(fqn); 328 if(n == null) return null; 329 return n.children; 330 } 331 332 336 public String getGroupName() {return groupname;} 337 338 342 public Channel getChannel() {return channel;} 343 344 348 public int getGroupMembersNumber() {return members.size();} 349 350 351 352 353 354 355 public void _add(String fqn) { 356 _add(fqn, null); 357 } 358 359 360 public void _add(String fqn, Serializable element) { 361 Node curr, n; 362 StringTokenizer tok; 363 String child_name; 364 String tmp_fqn=""; 365 366 if(root == null) { 367 root=new Node("/", null); 368 notifyNodeAdded("/", null); 369 } 370 if(fqn == null) 371 return; 372 curr=root; 373 tok=new StringTokenizer (fqn, "/"); 374 375 while(tok.hasMoreTokens()) { 376 child_name=tok.nextToken(); 377 tmp_fqn=tmp_fqn + '/' + child_name; 378 n=curr.findChild(child_name); 379 if(n == null) { 380 n=new Node(child_name, null); 381 curr.addChild(n); 382 if(!tok.hasMoreTokens()) { 383 n.element=element; 384 notifyNodeAdded(tmp_fqn, element); 385 return; 386 } 387 else 388 notifyNodeAdded(tmp_fqn, null); 389 } 390 curr=n; 391 } 392 curr.element=element; 393 notifyNodeModified(fqn, null, element); 394 } 395 396 397 public void _remove(String fqn) { 398 Node curr, n; 399 StringTokenizer tok; 400 String child_name=null; 401 402 if(fqn == null || root == null) 403 return; 404 curr=root; 405 tok=new StringTokenizer (fqn, "/"); 406 407 while(tok.countTokens() > 1) { 408 child_name=tok.nextToken(); 409 n=curr.findChild(child_name); 410 if(n == null) return; 412 curr=n; 413 } 414 try { 415 child_name=tok.nextToken(); 416 if(child_name != null) { 417 n=curr.removeChild(child_name); 418 if(n != null) 419 notifyNodeRemoved(fqn); 420 } 421 } 422 catch(Exception ex) { 423 } 424 } 425 426 427 public void _set(String fqn, Serializable element) { 428 Node n; 429 Serializable old_el=null; 430 431 if(fqn == null || element == null) return; 432 n=findNode(fqn); 433 if(n == null) { 434 if(log.isErrorEnabled()) log.error("node " + fqn + " not found"); 435 return; 436 } 437 old_el=n.element; 438 n.element=element; 439 notifyNodeModified(fqn, old_el, element); 440 } 441 442 443 public void _reset(String fqn, Serializable element) { 444 Node n; 445 Serializable old_el=null; 446 447 if(fqn == null || element == null) return; 448 n=findNode(fqn); 449 if(n == null) { 450 _add(fqn, element); 451 } 452 old_el=n.element; 453 n.element=element; 454 notifyNodeModified(fqn, old_el, element); 455 } 456 457 458 459 460 461 462 463 464 465 466 public void receive(Message msg) { 467 } 468 469 470 public byte[] getState() { 471 Object copy=root != null? root.copy() : null; 472 try { 473 return Util.objectToByteBuffer(copy); 474 } 475 catch(Throwable ex) { 476 if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex); 477 return null; 478 } 479 } 480 481 public void setState(byte[] data) { 482 Object new_state; 483 484 try { 485 new_state=Util.objectFromByteBuffer(data); 486 } 487 catch(Throwable ex) { 488 if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex); 489 return; 490 } 491 if(new_state == null) return; 492 if(!(new_state instanceof Node)) { 493 if(log.isErrorEnabled()) log.error("object is not of type 'Node'"); 494 return; 495 } 496 root=((Node)new_state).copy(); 497 } 498 499 500 501 502 503 public void viewAccepted(View new_view) { 504 Vector new_mbrs=new_view.getMembers(); 505 506 if(new_mbrs != null) { 507 sendViewChangeNotifications(new_mbrs, members); members.removeAllElements(); 509 for(int i=0; i < new_mbrs.size(); i++) 510 members.addElement(new_mbrs.elementAt(i)); 511 } 512 if(members.size() > 1) { 515 send_message=true; 516 } 517 else { 518 send_message=false; 519 } 520 } 521 522 523 524 public void suspect(Address suspected_mbr) { 525 } 526 527 528 529 public void block() { 530 } 531 532 533 void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) { 534 Vector joined, left; 535 Object mbr; 536 537 if(view_listeners.size() == 0 || old_mbrs == null || new_mbrs == null) 538 return; 539 540 541 joined=new Vector (); 543 for(int i=0; i < new_mbrs.size(); i++) { 544 mbr=new_mbrs.elementAt(i); 545 if(!old_mbrs.contains(mbr)) 546 joined.addElement(mbr); 547 } 548 549 550 left=new Vector (); 552 for(int i=0; i < old_mbrs.size(); i++) { 553 mbr=old_mbrs.elementAt(i); 554 if(!new_mbrs.contains(mbr)) 555 left.addElement(mbr); 556 } 557 notifyViewChange(joined, left); 558 } 559 560 561 Node findNode(String fqn) { 562 Node curr=root; 563 StringTokenizer tok; 564 String child_name; 565 566 if(fqn == null || root == null) return null; 567 if("/".equals(fqn) || "".equals(fqn)) 568 return root; 569 570 tok=new StringTokenizer (fqn, "/"); 571 while(tok.hasMoreTokens()) { 572 child_name=tok.nextToken(); 573 curr=curr.findChild(child_name); 574 if(curr == null) return null; 575 } 576 return curr; 577 } 578 579 580 void notifyNodeAdded(String fqn, Serializable element) { 581 for(int i=0; i < listeners.size(); i++) 582 ((DistributedTreeListener)listeners.elementAt(i)).nodeAdded(fqn, element); 583 } 584 585 void notifyNodeRemoved(String fqn) { 586 for(int i=0; i < listeners.size(); i++) 587 ((DistributedTreeListener)listeners.elementAt(i)).nodeRemoved(fqn); 588 } 589 590 void notifyNodeModified(String fqn, Serializable old_element, Serializable new_element) { 591 for(int i=0; i < listeners.size(); i++) 592 ((DistributedTreeListener)listeners.elementAt(i)).nodeModified(fqn, old_element, new_element); 593 } 594 595 597 void notifyAllNodesCreated(Node curr, String tmp_fqn) { 598 Node n; 599 600 if(curr == null) return; 601 if(curr.name == null) { 602 if(log.isErrorEnabled()) log.error("curr.name is null"); 603 return; 604 } 605 606 if(curr.children != null) { 607 for(int i=0; i < curr.children.size(); i++) { 608 n=(Node)curr.children.elementAt(i); 609 System.out.println("*** nodeCreated(): tmp_fqn is " + tmp_fqn); 610 notifyNodeAdded(tmp_fqn, n.element); 611 notifyAllNodesCreated(n, tmp_fqn + '/' + n.name); 612 } 613 } 614 } 615 616 617 void notifyViewChange(Vector new_mbrs, Vector old_mbrs) { 618 for(int i=0; i < view_listeners.size(); i++) 619 ((ViewListener)view_listeners.elementAt(i)).viewChange(new_mbrs, old_mbrs); 620 } 621 622 623 private static class Node implements Serializable { 624 String name=null; 625 Vector children=null; 626 Serializable element=null; 627 628 629 Node() { 630 } 631 632 Node(String name, Serializable element) { 633 this.name=name; 634 this.element=element; 635 } 636 637 638 void addChild(String relative_name, Serializable element) { 639 if(relative_name == null) 640 return; 641 if(children == null) 642 children=new Vector (); 643 else { 644 if(!children.contains(relative_name)) 645 children.addElement(new Node(relative_name, element)); 646 } 647 } 648 649 650 void addChild(Node n) { 651 if(n == null) return; 652 if(children == null) 653 children=new Vector (); 654 if(!children.contains(n)) 655 children.addElement(n); 656 } 657 658 659 Node removeChild(String rel_name) { 660 Node n=findChild(rel_name); 661 662 if(n != null) 663 children.removeElement(n); 664 return n; 665 } 666 667 668 Node findChild(String relative_name) { 669 Node child; 670 671 if(children == null || relative_name == null) 672 return null; 673 for(int i=0; i < children.size(); i++) { 674 child=(Node)children.elementAt(i); 675 if(child.name == null) { 676 if(log.isErrorEnabled()) log.error("child.name is null for " + relative_name); 677 continue; 678 } 679 680 if(child.name.equals(relative_name)) 681 return child; 682 } 683 684 return null; 685 } 686 687 688 public boolean equals(Object other) { 689 return other != null && ((Node)other).name != null && name != null && name.equals(((Node)other).name); 690 } 691 692 693 Node copy() { 694 Node ret=new Node(name, element); 695 696 if(children != null) 697 ret.children=(Vector )children.clone(); 698 return ret; 699 } 700 701 702 String print(int indent) { 703 StringBuffer sb=new StringBuffer (); 704 boolean is_root=name != null && "/".equals(name); 705 706 for(int i=0; i < indent; i++) 707 sb.append(' '); 708 if(!is_root) { 709 if(name == null) 710 sb.append("/<unnamed>"); 711 else { 712 sb.append('/' + name); 713 } 715 } 716 sb.append('\n'); 717 if(children != null) { 718 if(is_root) 719 indent=0; 720 else 721 indent+=4; 722 for(int i=0; i < children.size(); i++) 723 sb.append(((Node)children.elementAt(i)).print(indent)); 724 } 725 return sb.toString(); 726 } 727 728 729 public String toString() { 730 if(element != null) 731 return "[name: " + name + ", element: " + element + ']'; 732 else 733 return "[name: " + name + ']'; 734 } 735 736 } 737 738 739 } 740 741 742 | Popular Tags |