1 7 8 package org.jboss.ha.framework.server; 9 10 import java.io.ByteArrayInputStream ; 11 import java.io.ByteArrayOutputStream ; 12 import java.io.Serializable ; 13 import java.text.SimpleDateFormat ; 14 import java.util.ArrayList ; 15 import java.util.Date ; 16 import java.util.HashMap ; 17 import java.util.Iterator ; 18 import java.util.Vector ; 19 20 import javax.naming.Context ; 21 import javax.naming.InitialContext ; 22 import javax.naming.Name ; 23 import javax.naming.NameNotFoundException ; 24 import javax.naming.Reference ; 25 import javax.naming.StringRefAddr ; 26 import javax.management.MBeanServer ; 27 28 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 29 30 import org.jgroups.JChannel; 31 import org.jgroups.MergeView; 32 import org.jgroups.View; 33 import org.jgroups.Message; 34 import org.jgroups.blocks.GroupRequest; 35 import org.jgroups.blocks.MethodCall; 36 import org.jgroups.stack.IpAddress; 37 import org.jgroups.util.Rsp; 38 import org.jgroups.util.RspList; 39 import org.jgroups.util.Util; 40 41 import org.jboss.invocation.MarshalledValueInputStream; 42 import org.jboss.invocation.MarshalledValueOutputStream; 43 import org.jboss.ha.framework.interfaces.DistributedReplicantManager; 44 import org.jboss.ha.framework.interfaces.DistributedState; 45 import org.jboss.ha.framework.interfaces.HAPartition; 46 import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer; 47 import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener; 48 import org.jboss.ha.framework.interfaces.ClusterNode; 49 50 import org.jboss.naming.NonSerializableFactory; 51 import org.jboss.logging.Logger; 52 53 63 public class HAPartitionImpl 64 extends org.jgroups.blocks.RpcDispatcher 65 implements org.jgroups.MessageListener, org.jgroups.MembershipListener, 66 HAPartition 67 { 68 private static class NoHandlerForRPC implements Serializable 69 { 70 static final long serialVersionUID = -1263095408483622838L; 71 } 72 73 75 77 79 protected HashMap rpcHandlers = new HashMap (); 80 protected HashMap stateHandlers = new HashMap (); 81 82 protected ArrayList listeners = new ArrayList (); 83 84 protected ArrayList asynchListeners = new ArrayList (); 85 86 protected LinkedQueue asynchViewChanges = new LinkedQueue(); 87 88 protected Thread asynchNotifyThread; 89 90 protected Vector members = null; 91 protected Vector jgmembers = null; 92 93 public Vector history = null; 94 95 96 protected Vector otherMembers = null; 97 protected Vector jgotherMembers = null; 98 99 protected String partitionName; 100 101 protected org.jgroups.stack.IpAddress localJGAddress = null; 102 103 protected String nodeName; 104 105 protected ClusterNode me = null; 106 107 protected long timeout = 60000; 108 109 protected JChannel channel; 110 111 protected DistributedReplicantManagerImpl replicantManager; 112 113 protected DistributedStateImpl dsManager; 114 115 protected Logger log; 116 protected Logger clusterLifeCycleLog; 117 118 protected long currentViewId = -1; 119 120 protected MBeanServer server; 121 122 protected long state_transfer_timeout=60000; 123 124 126 129 public static Object objectFromByteBuffer (byte[] buffer) throws Exception 130 { 131 if(buffer == null) 132 return null; 133 134 ByteArrayInputStream bais = new ByteArrayInputStream (buffer); 135 MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais); 136 return mvis.readObject(); 137 } 138 139 143 public static byte[] objectToByteBuffer (Object obj) throws Exception 144 { 145 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 146 MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos); 147 mvos.writeObject(obj); 148 mvos.flush(); 149 return baos.toByteArray(); 150 } 151 152 public long getStateTransferTimeout() { 153 return state_transfer_timeout; 154 } 155 156 public void setStateTransferTimeout(long state_transfer_timeout) { 157 this.state_transfer_timeout=state_transfer_timeout; 158 } 159 160 161 public long getMethodCallTimeout() { 162 return timeout; 163 } 164 165 public void setMethodCallTimeout(long timeout) { 166 this.timeout=timeout; 167 } 168 169 171 public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection, MBeanServer server) throws Exception 172 { 173 this(partitionName, channel, deadlock_detection); 174 this.server = server; 175 } 176 177 public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection) throws Exception 178 { 179 super(channel, null, null, new Object (), deadlock_detection); this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName); 181 this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName); 182 this.channel = channel; 183 this.partitionName = partitionName; 184 this.history = new Vector (); 185 logHistory ("Partition object created"); 186 } 187 188 190 public void init() throws Exception 191 { 192 log.info("Initializing"); 193 logHistory ("Initializing partition"); 194 195 log.debug("setMembershipListener"); 198 setMembershipListener(this); 199 log.debug("setMessageListener"); 200 setMessageListener(this); 201 202 log.debug("create replicant manager"); 205 this.replicantManager = new DistributedReplicantManagerImpl(this, this.server); 206 log.debug("init replicant manager"); 207 this.replicantManager.init(); 208 log.debug("bind replicant manager"); 209 210 log.debug("create distributed state"); 213 this.dsManager = new DistributedStateImpl(this, this.server); 214 log.debug("init distributed state service"); 215 this.dsManager.init(); 216 log.debug("bind distributed state service"); 217 218 219 Context ctx = new InitialContext (); 222 this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx); 223 224 log.debug("done initing."); 225 } 226 227 public void startPartition() throws Exception 228 { 229 logHistory ("Starting partition"); 232 log.debug("get nodeName"); 233 this.localJGAddress = (IpAddress)channel.getLocalAddress(); 234 this.me = new ClusterNode(this.localJGAddress); 235 this.nodeName = this.me.getName(); 236 237 log.debug("Get current members"); 238 View view = channel.getView(); 239 this.jgmembers = (Vector )view.getMembers().clone(); 240 this.members = translateAddresses(this.jgmembers); log.info("Number of cluster members: " + members.size()); 242 for(int m = 0; m > members.size(); m ++) 243 { 244 Object node = members.get(m); 245 log.debug(node); 246 } 247 this.jgotherMembers = (Vector )view.getMembers().clone(); 250 this.jgotherMembers.remove (channel.getLocalAddress()); 251 this.otherMembers = translateAddresses(this.jgotherMembers); log.info ("Other members: " + this.otherMembers.size ()); 253 254 verifyNodeIsUnique (view.getMembers()); 255 256 this.currentViewId = view.getVid().getId(); 259 260 log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):"); 263 boolean rc = channel.getState(null, this.state_transfer_timeout); 264 if (rc) 265 log.debug("State was retrieved successfully"); 266 else 267 log.debug("State could not be retrieved, (must be first member of group)"); 268 269 this.replicantManager.start(); 272 this.dsManager.start(); 273 274 AsynchViewChangeHandler asynchHandler = new AsynchViewChangeHandler(); 276 asynchNotifyThread = new Thread (asynchHandler, "AsynchHAMembershipListener Thread"); 277 asynchNotifyThread.start(); 278 } 279 280 public void closePartition() throws Exception 281 { 282 logHistory ("Closing partition"); 283 log.info("Closing partition " + partitionName); 284 285 try 286 { 287 asynchNotifyThread.interrupt(); 288 } 289 catch( Exception e) 290 { 291 log.warn("Failed to interrupte asynchNotifyThread", e); 292 } 293 294 try 297 { 298 this.replicantManager.stop(); 299 } 300 catch (Exception e) 301 { 302 log.error("operation failed", e); 303 } 304 305 try 306 { 307 this.dsManager.stop(); 308 } 309 catch (Exception e) 310 { 311 log.error("operation failed", e); 312 } 313 314 try 317 { 318 channel.disconnect(); 320 } 321 catch (Exception e) 322 { 323 log.error("operation failed", e); 324 } 325 326 340 log.info("Partition " + partitionName + " closed."); 341 } 342 343 public void destroyPartition() throws Exception 345 { 346 347 try 348 { 349 this.replicantManager.destroy(); 350 } 351 catch (Exception e) 352 { 353 log.error("operation failed", e); 354 } 355 356 try 357 { 358 this.dsManager.destroy(); 359 } 360 catch (Exception e) 361 { 362 log.error("operation failed", e); 363 } 364 try 365 { 366 channel.close(); 367 } 368 catch (Exception e) 369 { 370 log.error("operation failed", e); 371 } 372 373 String boundName = "/HAPartition/" + partitionName; 374 375 InitialContext ctx = new InitialContext (); 376 try 377 { 378 379 ctx.unbind(boundName); 380 } 381 finally 382 { 383 ctx.close(); 384 } 385 NonSerializableFactory.unbind (boundName); 386 387 log.info("Partition " + partitionName + " destroyed."); 388 } 389 390 392 public byte[] getState() 395 { 396 logHistory ("getState called on partition"); 397 boolean debug = log.isDebugEnabled(); 398 399 log.debug("getState called."); 400 try 401 { 402 HashMap state = new HashMap (); 406 Iterator keys = stateHandlers.keySet().iterator(); 407 while (keys.hasNext()) 408 { 409 String key = (String )keys.next(); 410 HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key); 411 if (debug) 412 log.debug("getState for " + key); 413 state.put(key, subscriber.getCurrentState()); 414 } 415 return objectToByteBuffer(state); 416 } 417 catch (Exception ex) 418 { 419 log.error("getState failed", ex); 420 } 421 return null; 422 } 423 424 public void setState(byte[] obj) 425 { 426 logHistory ("setState called on partition"); 427 try 428 { 429 log.debug("setState called"); 430 if (obj == null) 431 { 432 log.debug("state is null"); 433 return; 434 } 435 436 long used_mem_before, used_mem_after; 437 int state_size=obj != null? obj.length : 0; 438 Runtime rt=Runtime.getRuntime(); 439 used_mem_before=rt.totalMemory() - rt.freeMemory(); 440 441 HashMap state = (HashMap )objectFromByteBuffer(obj); 442 java.util.Iterator keys = state.keySet().iterator(); 443 while (keys.hasNext()) 444 { 445 String key = (String )keys.next(); 446 log.debug("setState for " + key); 447 Object someState = state.get(key); 448 HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key); 449 if (subscriber != null) 450 { 451 subscriber.setCurrentState((java.io.Serializable )someState); 452 } 453 else 454 { 455 log.debug("There is no stateHandler for: " + key); 456 } 457 } 458 459 used_mem_after=rt.totalMemory() - rt.freeMemory(); 460 log.debug("received a state of " + state_size + " bytes; expanded memory by " + 461 (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before + 462 ", used memory after: " + used_mem_after + ")"); 463 } 464 catch (Exception ex) 465 { 466 log.error("setState failed", ex); 467 } 468 } 469 470 public void receive(org.jgroups.Message msg) 471 { } 472 473 475 public void suspect(org.jgroups.Address suspected_mbr) 476 { 477 logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString())); 478 if (isCurrentNodeCoordinator ()) 479 clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr); 480 else 481 log.info("Suspected member: " + suspected_mbr); 482 } 483 484 public void block() {} 485 486 495 public void viewAccepted(View newView) 496 { 497 try 498 { 499 this.currentViewId = newView.getVid().getId(); 502 503 this.jgotherMembers = (Vector )newView.getMembers().clone(); 506 this.jgotherMembers.remove (channel.getLocalAddress()); 507 this.otherMembers = translateAddresses (this.jgotherMembers); Vector translatedNewView = translateAddresses ((Vector )newView.getMembers().clone()); 509 logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId + 510 " (old view: " + this.members + " )"); 511 512 513 Vector oldMembers = this.members; 515 516 Vector newjgMembers = (Vector )newView.getMembers().clone(); 517 Vector newMembers = translateAddresses(newjgMembers); if (this.members == null) 519 { 520 this.members = newMembers; 522 this.jgmembers = newjgMembers; 523 log.debug("ViewAccepted: initial members set"); 524 return; 525 } 526 this.members = newMembers; 527 this.jgmembers = newjgMembers; 528 529 int difference = 0; 530 if (oldMembers == null) 531 difference = newMembers.size () - 1; 532 else 533 difference = newMembers.size () - oldMembers.size (); 534 535 if (isCurrentNodeCoordinator ()) 536 clusterLifeCycleLog.info ("New cluster view for partition " + this.partitionName + " (id: " + 537 this.currentViewId + ", delta: " + difference + ") : " + this.members); 538 else 539 log.info("New cluster view for partition " + this.partitionName + ": " + 540 this.currentViewId + " (" + this.members + " delta: " + difference + ")"); 541 542 ViewChangeEvent event = new ViewChangeEvent(); 544 event.viewId = currentViewId; 545 event.allMembers = translatedNewView; 546 event.deadMembers = getDeadMembers(oldMembers, event.allMembers); 547 event.newMembers = getNewMembers(oldMembers, event.allMembers); 548 event.originatingGroups = null; 549 if(newView instanceof MergeView) 551 { 552 MergeView mergeView = (MergeView) newView; 553 event.originatingGroups = mergeView.getSubgroups(); 554 } 555 556 log.debug("membership changed from " + this.members.size() + " to " 557 + event.allMembers.size()); 558 this.asynchViewChanges.put(event); 560 561 this.notifyListeners(listeners, event.viewId, event.allMembers, 563 event.deadMembers, event.newMembers, event.originatingGroups); 564 } 565 catch (Exception ex) 566 { 567 log.error("ViewAccepted failed", ex); 568 } 569 } 570 571 573 public String getNodeName() 574 { 575 return nodeName; 576 } 577 578 public String getPartitionName() 579 { 580 return partitionName; 581 } 582 583 public DistributedReplicantManager getDistributedReplicantManager() 584 { 585 return replicantManager; 586 } 587 588 public DistributedState getDistributedStateService() 589 { 590 return this.dsManager; 591 } 592 593 public long getCurrentViewId() 594 { 595 return this.currentViewId; 596 } 597 598 public Vector getCurrentView() 599 { 600 Vector result = new Vector (this.members.size()); 601 for (int i = 0; i < members.size(); i++) 602 { 603 result.add( ((ClusterNode) members.elementAt(i)).getName() ); 604 } 605 return result; 606 } 607 608 public ClusterNode[] getClusterNodes () 609 { 610 ClusterNode[] nodes = new ClusterNode[this.members.size()]; 611 this.members.toArray(nodes); 612 return nodes; 613 } 614 615 public ClusterNode getClusterNode () 616 { 617 return me; 618 } 619 620 public boolean isCurrentNodeCoordinator () 621 { 622 if(this.members == null || this.members.size() == 0 || this.me == null) 623 return false; 624 return this.members.elementAt (0).equals (this.me); 625 } 626 627 public void registerRPCHandler(String objName, Object subscriber) 634 { 635 rpcHandlers.put(objName, subscriber); 636 } 637 638 public void unregisterRPCHandler(String objName, Object subscriber) 639 { 640 rpcHandlers.remove(objName); 641 } 642 643 644 654 public ArrayList callMethodOnCluster(String objName, String methodName, 655 Object [] args, boolean excludeSelf) throws Exception 656 { 657 return callMethodOnCluster(objName, methodName, args, null, excludeSelf); 658 } 659 660 663 public ArrayList callMethodOnCluster(String objName, String methodName, 664 Object [] args, Class [] types, boolean excludeSelf) throws Exception 665 { 666 return callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.timeout); 667 } 668 669 670 public ArrayList callMethodOnCluster(String objName, String methodName, 671 Object [] args, Class [] types, boolean excludeSelf, long methodTimeout) throws Exception 672 { 673 ArrayList rtn = new ArrayList (); 674 MethodCall m=null; 675 RspList rsp = null; 676 boolean trace = log.isTraceEnabled(); 677 678 if(types != null) 679 m=new MethodCall(objName + "." + methodName, args, types); 680 else 681 m=new MethodCall(objName + "." + methodName, args); 682 683 if (excludeSelf) 684 { 685 if( trace ) 686 { 687 log.trace("callMethodOnCluster(true), objName="+objName 688 +", methodName="+methodName+", members="+jgotherMembers); 689 } 690 rsp = this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout); 691 } 692 else 693 { 694 if( trace ) 695 { 696 log.trace("callMethodOnCluster(false), objName="+objName 697 +", methodName="+methodName+", members="+members); 698 } 699 rsp = this.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout); 700 } 701 702 if (rsp != null) 703 { 704 for (int i = 0; i < rsp.size(); i++) 705 { 706 Object item = rsp.elementAt(i); 707 if (item instanceof Rsp) 708 { 709 Rsp response = (Rsp) item; 710 boolean wasReceived = response.wasReceived(); 712 if( wasReceived == true ) 713 { 714 item = response.getValue(); 715 if (!(item instanceof NoHandlerForRPC)) 716 rtn.add(item); 717 } 718 else if( trace ) 719 log.trace("Ignoring non-received response: "+response); 720 } 721 else 722 { 723 if (!(item instanceof NoHandlerForRPC)) 724 rtn.add(item); 725 else if( trace ) 726 log.trace("Ignoring NoHandlerForRPC"); 727 } 728 } 729 } 730 731 return rtn; 732 } 733 734 746 public ArrayList callMethodOnCoordinatorNode(String objName, String methodName, 747 Object [] args, Class [] types,boolean excludeSelf) throws Exception 748 { 749 return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf,this.timeout); 750 } 751 752 765 public ArrayList callMethodOnCoordinatorNode(String objName, String methodName, 766 Object [] args, Class [] types,boolean excludeSelf, long methodTimeout) throws Exception 767 { 768 ArrayList rtn = new ArrayList (); 769 MethodCall m=null; 770 RspList rsp = null; 771 boolean trace = log.isTraceEnabled(); 772 773 if(types != null) 774 m=new MethodCall(objName + "." + methodName, args, types); 775 else 776 m=new MethodCall(objName + "." + methodName, args); 777 778 if( trace ) 779 { 780 log.trace("callMethodOnCoordinatorNode(false), objName="+objName 781 +", methodName="+methodName); 782 } 783 784 Vector coordinatorOnly = new Vector (); 786 if (false == isCurrentNodeCoordinator () || 788 false == excludeSelf) 789 coordinatorOnly.addElement(this.jgmembers.elementAt (0)); 790 791 rsp = this.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout); 792 793 if (rsp != null) 794 { 795 for (int i = 0; i < rsp.size(); i++) 796 { 797 Object item = rsp.elementAt(i); 798 if (item instanceof Rsp) 799 { 800 Rsp response = (Rsp) item; 801 boolean wasReceived = response.wasReceived(); 803 if( wasReceived == true ) 804 { 805 item = response.getValue(); 806 if (!(item instanceof NoHandlerForRPC)) 807 rtn.add(item); 808 } 809 else if( trace ) 810 log.trace("Ignoring non-received response: "+response); 811 } 812 else 813 { 814 if (!(item instanceof NoHandlerForRPC)) 815 rtn.add(item); 816 else if( trace ) 817 log.trace("Ignoring NoHandlerForRPC"); 818 } 819 } 820 } 821 822 return rtn; 823 } 824 825 826 835 public void callAsynchMethodOnCluster(String objName, String methodName, 836 Object [] args, boolean excludeSelf) throws Exception { 837 callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf); 838 } 839 840 841 842 843 844 847 public void callAsynchMethodOnCluster(String objName, String methodName, 848 Object [] args, Class [] types, boolean excludeSelf) throws Exception 849 { 850 MethodCall m = null; 851 boolean trace = log.isTraceEnabled(); 852 853 if(types != null) 854 m=new MethodCall(objName + "." + methodName, args, types); 855 else 856 m=new MethodCall(objName + "." + methodName, args); 857 858 if (excludeSelf) 859 { 860 if( trace ) 861 { 862 log.trace("callAsynchMethodOnCluster(true), objName="+objName 863 +", methodName="+methodName+", members="+jgotherMembers); 864 } 865 this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, timeout); 866 } 867 else 868 { 869 if( trace ) 870 { 871 log.trace("callAsynchMethodOnCluster(false), objName="+objName 872 +", methodName="+methodName+", members="+members); 873 } 874 this.callRemoteMethods(null, m, GroupRequest.GET_NONE, timeout); 875 } 876 } 877 878 public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber) 885 { 886 stateHandlers.put(objectName, subscriber); 887 } 888 889 public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber) 890 { 891 stateHandlers.remove(objectName); 892 } 893 894 public void registerMembershipListener(HAMembershipListener listener) 901 { 902 synchronized(this.listeners) 903 { 904 this.listeners.add(listener); 905 } 906 } 907 908 public void unregisterMembershipListener(HAMembershipListener listener) 909 { 910 synchronized(this.listeners) 911 { 912 this.listeners.remove(listener); 913 } 914 } 915 916 918 926 public Object handle(Message req) 927 { 928 Object body = null; 929 Object retval = null; 930 MethodCall method_call = null; 931 boolean trace = log.isTraceEnabled(); 932 933 if( trace ) 934 log.trace("Partition " + partitionName + " received msg"); 935 if(req == null || req.getBuffer() == null) 936 { 937 log.warn("RpcProtocol.Handle(): message or message buffer is null !"); 938 return null; 939 } 940 941 try 942 { 943 body = Util.objectFromByteBuffer(req.getBuffer()); 944 } 945 catch(Exception e) 946 { 947 log.warn("RpcProtocol.Handle(): " + e); 948 return null; 949 } 950 951 if(body == null || !(body instanceof MethodCall)) 952 { 953 log.warn("RpcProtocol.Handle(): message does not contain a MethodCall object !"); 954 return null; 955 } 956 957 method_call = (MethodCall)body; 960 String methodName = method_call.getName(); 961 962 if( trace ) 963 log.trace("pre methodName: " + methodName); 964 965 int idx = methodName.lastIndexOf('.'); 966 String handlerName = methodName.substring(0, idx); 967 String newMethodName = methodName.substring(idx + 1); 968 969 if( trace ) 970 { 971 log.trace("handlerName: " + handlerName + " methodName: " + newMethodName); 972 log.trace("Handle: " + methodName); 973 } 974 975 method_call.setName(newMethodName); 977 Object handler = rpcHandlers.get(handlerName); 978 if (handler == null) 979 { 980 if( trace ) 981 log.debug("No rpc handler registered under: "+handlerName); 982 return new NoHandlerForRPC(); 983 } 984 985 989 try 990 { 991 retval = method_call.invoke(handler); 992 if( trace ) 993 log.trace("rpc call return value: "+retval); 994 } 995 catch (Throwable t) 996 { 997 if( trace ) 998 log.trace("rpc call threw exception", t); 999 retval = t; 1000 } 1001 1002 return retval; 1003 } 1004 1005 1007 1009 protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception 1010 { 1011 byte[] localUniqueName = this.localJGAddress.getAdditionalData(); 1012 if (localUniqueName == null) 1013 log.warn("No additional information has been found in the JavaGroup address: " + 1014 "make sure you are running with a correct version of JGroups and that the protocol " + 1015 " you are using supports the 'additionalData' behaviour"); 1016 1017 for (int i = 0; i < javaGroupIpAddresses.size(); i++) 1018 { 1019 IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i); 1020 if (!address.equals(this.localJGAddress)) 1021 { 1022 if (localUniqueName.equals(address.getAdditionalData())) 1023 throw new Exception ("Local node removed from cluster (" + this.localJGAddress + "): another node (" + address + ") publicizing the same name was already there"); 1024 } 1025 } 1026 } 1027 1028 1036 protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception 1037 { 1038 NonSerializableFactory.bind(jndiName, who); 1041 Name n = ctx.getNameParser("").parse(jndiName); 1042 while (n.size () > 1) 1043 { 1044 String ctxName = n.get (0); 1045 try 1046 { 1047 ctx = (Context )ctx.lookup (ctxName); 1048 } 1049 catch (NameNotFoundException e) 1050 { 1051 log.debug ("creating Subcontext" + ctxName); 1052 ctx = ctx.createSubcontext (ctxName); 1053 } 1054 n = n.getSuffix (1); 1055 } 1056 1057 StringRefAddr addr = new StringRefAddr ("nns", jndiName); 1061 Reference ref = new Reference (classType.getName (), addr, NonSerializableFactory.class.getName (), null); 1062 ctx.rebind (n.get (0), ref); 1063 } 1064 1065 1072 protected Vector getDeadMembers(Vector oldMembers, Vector newMembers) 1073 { 1074 boolean debug = log.isDebugEnabled(); 1075 if(oldMembers == null) oldMembers=new Vector (); 1076 if(newMembers == null) newMembers=new Vector (); 1077 Vector dead=(Vector )oldMembers.clone(); 1078 dead.removeAll(newMembers); 1079 if(dead.size() > 0 && debug) 1080 log.debug("dead members: " + dead); 1081 return dead; 1082 } 1083 1084 1090 protected Vector getNewMembers(Vector oldMembers, Vector allMembers) 1091 { 1092 if(oldMembers == null) oldMembers=new Vector (); 1093 if(allMembers == null) allMembers=new Vector (); 1094 Vector newMembers=(Vector )allMembers.clone(); 1095 newMembers.removeAll(oldMembers); 1096 return newMembers; 1097 } 1098 1099 protected void notifyListeners(ArrayList theListeners, long viewID, 1100 Vector allMembers, Vector deadMembers, Vector newMembers, 1101 Vector originatingGroups) 1102 { 1103 log.debug("Begin notifyListeners, viewID: "+viewID); 1104 synchronized(theListeners) 1105 { 1106 for (int i = 0; i < theListeners.size(); i++) 1107 { 1108 HAMembershipListener aListener = null; 1109 try 1110 { 1111 aListener = (HAMembershipListener) theListeners.get(i); 1112 if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener)) 1113 { 1114 HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener; 1115 exListener.membershipChangedDuringMerge (deadMembers, newMembers, 1116 allMembers, originatingGroups); 1117 } 1118 else 1119 { 1120 aListener.membershipChanged(deadMembers, newMembers, allMembers); 1121 } 1122 } 1123 catch (Throwable e) 1124 { 1125 log.warn("HAMembershipListener callback failure: "+aListener, e); 1127 } 1128 } 1129 } 1130 log.debug("End notifyListeners, viewID: "+viewID); 1131 } 1132 1133 protected Vector translateAddresses (Vector jgAddresses) 1134 { 1135 if (jgAddresses == null) 1136 return null; 1137 1138 Vector result = new Vector (jgAddresses.size()); 1139 for (int i = 0; i < jgAddresses.size(); i++) 1140 { 1141 IpAddress addr = (IpAddress) jgAddresses.elementAt(i); 1142 result.add(new ClusterNode (addr)); 1143 } 1144 1145 return result; 1146 } 1147 1148 public void logHistory (String message) 1149 { 1150 try 1151 { 1152 history.add(new SimpleDateFormat ().format (new Date ()) + " : " + message); 1153 } 1154 catch (Exception ignored){} 1155 } 1156 1157 1160 private static class ViewChangeEvent 1161 { 1162 long viewId; 1163 Vector deadMembers; 1164 Vector newMembers; 1165 Vector allMembers; 1166 Vector originatingGroups; 1167 } 1168 1169 1171 private class AsynchViewChangeHandler implements Runnable 1172 { 1173 public void run() 1174 { 1175 log.debug("Begin AsynchViewChangeHandler"); 1176 while( true ) 1177 { 1178 try 1179 { 1180 ViewChangeEvent event = (ViewChangeEvent) asynchViewChanges.take(); 1181 notifyListeners(asynchListeners, event.viewId, event.allMembers, 1182 event.deadMembers, event.newMembers, event.originatingGroups); 1183 } 1184 catch(InterruptedException e) 1185 { 1186 log.debug("AsynchViewChangeHandler interrupted", e); 1187 break; 1188 } 1189 } 1190 log.debug("End AsynchViewChangeHandler"); 1191 } 1192 } 1193 1194 1196 1198} 1199 | Popular Tags |