1 22 package org.jboss.ha.framework.server; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.ByteArrayOutputStream ; 26 import java.io.IOException ; 27 import java.io.InputStream ; 28 import java.io.OutputStream ; 29 import java.io.Serializable ; 30 import java.net.InetAddress ; 31 import java.rmi.dgc.VMID ; 32 import java.rmi.server.UID ; 33 import java.text.SimpleDateFormat ; 34 import java.util.ArrayList ; 35 import java.util.Date ; 36 import java.util.HashMap ; 37 import java.util.Iterator ; 38 import java.util.Map ; 39 import java.util.Vector ; 40 41 import javax.naming.Context ; 42 import javax.naming.InitialContext ; 43 import javax.naming.Name ; 44 import javax.naming.NameNotFoundException ; 45 import javax.naming.Reference ; 46 import javax.naming.StringRefAddr ; 47 48 import org.jboss.cache.Cache; 49 import org.jboss.ha.framework.interfaces.ClusterNode; 50 import org.jboss.ha.framework.interfaces.DistributedReplicantManager; 51 import org.jboss.ha.framework.interfaces.DistributedState; 52 import org.jboss.ha.framework.interfaces.HAPartition; 53 import org.jboss.invocation.MarshalledValueInputStream; 54 import org.jboss.invocation.MarshalledValueOutputStream; 55 import org.jboss.logging.Logger; 56 import org.jboss.naming.NonSerializableFactory; 57 import org.jboss.system.ServiceMBeanSupport; 58 import org.jboss.system.server.ServerConfigUtil; 59 import org.jgroups.Channel; 60 import org.jgroups.Event; 61 import org.jgroups.ExtendedMessageListener; 62 import org.jgroups.JChannel; 63 import org.jgroups.MembershipListener; 64 import org.jgroups.MergeView; 65 import org.jgroups.Message; 66 import org.jgroups.MessageListener; 67 import org.jgroups.Version; 68 import org.jgroups.View; 69 import org.jgroups.blocks.GroupRequest; 70 import org.jgroups.blocks.MethodCall; 71 import org.jgroups.blocks.RequestHandler; 72 import org.jgroups.blocks.RpcDispatcher; 73 import org.jgroups.debug.Debugger; 74 import org.jgroups.jmx.JChannelFactoryMBean; 75 import org.jgroups.stack.IpAddress; 76 import org.jgroups.util.Rsp; 77 import org.jgroups.util.RspList; 78 79 90 public class ClusterPartition 91 extends ServiceMBeanSupport 92 implements MembershipListener, HAPartition, 93 AsynchEventHandler.AsynchEventProcessor, 94 ClusterPartitionMBean 95 { 96 private static final byte NULL_VALUE = 0; 97 private static final byte SERIALIZABLE_VALUE = 1; 98 101 104 private static class NoHandlerForRPC implements Serializable 105 { 106 static final long serialVersionUID = -1263095408483622838L; 107 } 108 109 private static class StateStreamEnd implements Serializable 110 { 111 112 private static final long serialVersionUID = -3705345735451504946L; 113 } 114 115 117 119 121 protected ClusterPartitionConfig config; 122 protected HashMap rpcHandlers = new HashMap (); 123 protected HashMap stateHandlers = new HashMap (); 124 125 protected boolean allowSyncListeners = false; 126 127 protected ArrayList synchListeners = new ArrayList (); 128 129 protected ArrayList asynchListeners = new ArrayList (); 130 131 protected AsynchEventHandler asynchHandler; 132 133 protected Vector members = null; 134 protected Vector jgmembers = null; 135 136 public Vector history = null; 137 138 139 protected Vector otherMembers = null; 140 protected Vector jgotherMembers = null; 141 142 protected org.jgroups.stack.IpAddress localJGAddress = null; 143 144 protected String nodeName; 145 146 protected ClusterNode me = null; 147 148 protected JChannel channel; 149 150 protected DistributedReplicantManager replicantManager; 151 152 protected Logger log; 153 protected Logger clusterLifeCycleLog; 154 155 protected long currentViewId = -1; 156 157 private RpcDispatcher dispatcher = null; 158 159 162 protected boolean isStateSet = false; 163 164 167 protected Exception setStateException; 168 private final Object stateLock = new Object (); 169 private final MessageListenerAdapter messageListener = new MessageListenerAdapter(); 170 private Debugger debugger; 171 private boolean selfCreatedDRM; 172 173 175 178 public static Object objectFromByteBuffer (byte[] buffer) throws Exception 179 { 180 if(buffer == null) 181 return null; 182 183 ByteArrayInputStream bais = new ByteArrayInputStream (buffer); 184 MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais); 185 return mvis.readObject(); 186 } 187 188 192 public static byte[] objectToByteBuffer (Object obj) throws Exception 193 { 194 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 195 MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos); 196 mvos.writeObject(obj); 197 mvos.flush(); 198 return baos.toByteArray(); 199 } 200 201 private static JChannel createMuxChannel(ClusterPartitionConfig config) 202 { 203 JChannelFactoryMBean factory = config.getMultiplexer(); 204 if (factory == null) 205 throw new IllegalStateException ("HAPartitionConfig has no JChannelFactory"); 206 String stack = config.getMultiplexerStack(); 207 if (stack == null) 208 throw new IllegalStateException ("HAPartitionConfig has no multiplexer stack"); 209 try 210 { 211 return (JChannel) factory.createMultiplexerChannel(stack, config.getMultiplexerStack()); 212 } 213 catch (RuntimeException e) 214 { 215 throw e; 216 } 217 catch (Exception e) 218 { 219 throw new RuntimeException ("Failure creatig multiplexed Channel", e); 220 } 221 } 222 223 225 public ClusterPartition(ClusterPartitionConfig config) 226 { 227 if (config == null) 228 throw new IllegalArgumentException ("config cannot be null"); 229 230 this.config = config; 231 setupLoggers(config.getPartitionName()); 232 this.history = new Vector (); 233 logHistory ("Partition object created"); 234 } 235 236 238 240 protected void createService() throws Exception 241 { 242 if (config == null) 243 throw new IllegalArgumentException ("config cannot be null"); 244 log.debug("Creating Multiplexer Channel for partition " + getPartitionName() + 245 " using stack " + getMultiplexerStack()); 246 247 channel = createMuxChannel(config); 248 249 channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE); 250 channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE); 251 252 log.info("Initializing partition " + getPartitionName()); 253 logHistory ("Initializing partition " + getPartitionName()); 254 255 dispatcher = new RpcHandler(channel, null, null, new Object (), config.getDeadlockDetection()); 256 257 log.debug("setMembershipListener"); 259 dispatcher.setMembershipListener(this); 260 log.debug("setMessageListener"); 261 dispatcher.setMessageListener(messageListener); 262 dispatcher.setMarshaller(new MarshallerImpl()); 263 264 if (replicantManager == null) 266 { 267 log.debug("create replicant manager"); 269 DistributedReplicantManagerImpl drm = new DistributedReplicantManagerImpl(this); 270 if (server != null) 271 drm.registerWithJmx(server); 272 log.debug("create replicant manager"); 273 drm.create(); 274 setDistributedReplicantManager(drm); 275 this.selfCreatedDRM = true; 276 } 277 278 284 asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler"); 286 287 log.debug("done initializing partition"); 288 } 289 290 protected void startService() throws Exception 291 { 292 logHistory ("Starting partition"); 293 294 configureUniqueId(); 296 297 channel.connect(getPartitionName()); 298 299 try 300 { 301 303 log.debug("get nodeName"); 304 this.localJGAddress = (IpAddress)channel.getLocalAddress(); 305 this.me = new ClusterNode(this.localJGAddress); 306 this.nodeName = this.me.getName(); 307 308 log.debug("Get current members"); 310 View view = channel.getView(); 311 this.jgmembers = (Vector )view.getMembers().clone(); 312 this.members = translateAddresses(this.jgmembers); log.info("Number of cluster members: " + members.size()); 314 for(int m = 0; m > members.size(); m ++) 315 { 316 Object node = members.get(m); 317 log.debug(node); 318 } 319 321 this.jgotherMembers = (Vector )view.getMembers().clone(); 322 this.jgotherMembers.remove (channel.getLocalAddress()); 323 this.otherMembers = translateAddresses(this.jgotherMembers); log.info ("Other members: " + this.otherMembers.size ()); 325 326 verifyNodeIsUnique(view.getMembers()); 327 328 this.currentViewId = view.getVid().getId(); 330 331 fetchState(); 334 335 if (selfCreatedDRM) 336 { 337 ((DistributedReplicantManagerImpl) this.replicantManager).start(); 339 } 340 341 asynchHandler.start(); 343 344 Context ctx = new InitialContext (); 346 this.bind("/HAPartition/" + getPartitionName(), this, ClusterPartition.class, ctx); 347 log.debug("Bound in JNDI under /HAPartition/" + getPartitionName()); 348 } 349 catch (Throwable t) 350 { 351 log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage()); 352 channel.disconnect(); 353 throw (t instanceof Exception ) ? (Exception ) t : new RuntimeException (t); 354 } 355 356 } 357 358 protected void stopService() throws Exception 359 { 360 logHistory ("Stopping partition"); 361 log.info("Stopping partition " + getPartitionName()); 362 363 stopChannelDebugger(); 364 365 try 366 { 367 asynchHandler.stop(); 368 } 369 catch( Exception e) 370 { 371 log.warn("Failed to stop asynchHandler", e); 372 } 373 374 375 if (selfCreatedDRM) 376 { 377 try 380 { 381 ((DistributedReplicantManagerImpl) this.replicantManager).stop(); 382 } 383 catch (Exception e) 384 { 385 log.error("operation failed", e); 386 } 387 } 388 389 try 392 { 393 channel.disconnect(); 394 } 395 catch (Exception e) 396 { 397 log.error("operation failed", e); 398 } 399 400 String boundName = "/HAPartition/" + getPartitionName(); 401 402 InitialContext ctx = new InitialContext (); 403 try 404 { 405 ctx.unbind(boundName); 406 } 407 finally 408 { 409 ctx.close(); 410 } 411 412 NonSerializableFactory.unbind (boundName); 413 414 log.info("Partition " + getPartitionName() + " stopped."); 415 } 416 417 protected void destroyService() throws Exception 418 { 419 log.debug("Destroying HAPartition: " + getPartitionName()); 420 421 if (selfCreatedDRM) 422 { 423 try 424 { 425 if (server != null) 426 ((DistributedReplicantManagerImpl) replicantManager).unregisterWithJmx(server); 427 ((DistributedReplicantManagerImpl) replicantManager).destroy(); 428 } 429 catch (Exception e) 430 { 431 log.error("Destroying DRM failed", e); 432 } 433 } 434 try 435 { 436 channel.close(); 437 } 438 catch (Exception e) 439 { 440 log.error("Closing channel failed", e); 441 } 442 443 log.info("Partition " + getPartitionName() + " destroyed."); 444 } 445 446 448 449 protected void fetchState() throws Exception 450 { 451 log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() + 452 " milliseconds):"); 453 long start, stop; 454 isStateSet = false; 455 start = System.currentTimeMillis(); 456 boolean rc = channel.getState(null, getStateTransferTimeout()); 457 if (rc) 458 { 459 synchronized (stateLock) 460 { 461 while (!isStateSet) 462 { 463 if (setStateException != null) 464 throw setStateException; 465 466 try 467 { 468 stateLock.wait(); 469 } 470 catch (InterruptedException iex) 471 { 472 } 473 } 474 } 475 stop = System.currentTimeMillis(); 476 log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)"); 477 } 478 else 479 { 480 484 synchronized (members) 485 { 486 while (members.size() == 0) 487 { 488 log.debug("waiting on viewAccepted()"); 489 try 490 { 491 members.wait(); 492 } 493 catch (InterruptedException iex) 494 { 495 } 496 } 497 } 498 499 if (isCurrentNodeCoordinator()) 500 { 501 log.info("State could not be retrieved (we are the first member in group)"); 502 } 503 else 504 { 505 throw new IllegalStateException ("Initial serviceState transfer failed: " + 506 "Channel.getState() returned false"); 507 } 508 } 509 } 510 511 private void getStateInternal(OutputStream stream) throws IOException 512 { 513 MarshalledValueOutputStream mvos = null; 515 for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); ) 516 { 517 Map.Entry entry = (Map.Entry )keys.next(); 518 HAPartition.HAPartitionStateTransfer subscriber = 519 (HAPartition.HAPartitionStateTransfer) entry.getValue(); 520 log.debug("getState for " + entry.getKey()); 521 Object state = subscriber.getCurrentState(); 522 if (state != null) 523 { 524 if (mvos == null) 525 { 526 stream.write(SERIALIZABLE_VALUE); 528 529 mvos = new MarshalledValueOutputStream(stream); 530 } 531 532 mvos.writeObject(entry.getKey()); 533 mvos.writeObject(state); 534 } 535 } 536 537 if (mvos == null) 538 { 539 stream.write(NULL_VALUE); 541 } 542 else 543 { 544 mvos.writeObject(new StateStreamEnd()); 545 } 546 547 } 548 549 private void setStateInternal(InputStream stream) throws IOException , ClassNotFoundException 550 { 551 byte type = (byte) stream.read(); 552 553 if (type == NULL_VALUE) 554 { 555 log.debug("serviceState is null"); 556 return; 557 } 558 559 long used_mem_before, used_mem_after; 560 Runtime rt=Runtime.getRuntime(); 561 used_mem_before=rt.totalMemory() - rt.freeMemory(); 562 563 MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream); 564 565 while (true) 566 { 567 Object obj = mvis.readObject(); 568 if (obj instanceof StateStreamEnd) 569 break; 570 571 String key = (String ) obj; 572 log.debug("setState for " + key); 573 Object someState = mvis.readObject(); 574 HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key); 575 if (subscriber != null) 576 { 577 try 578 { 579 subscriber.setCurrentState((Serializable )someState); 580 } 581 catch (Exception e) 582 { 583 if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key)) 588 { 589 if (e instanceof RuntimeException ) 590 throw (RuntimeException ) e; 591 else 592 throw new RuntimeException (e); 593 } 594 else 595 { 596 log.error("Caught exception setting serviceState to " + subscriber, e); 597 } 598 } 599 } 600 else 601 { 602 log.debug("There is no stateHandler for: " + key); 603 } 604 } 605 606 used_mem_after=rt.totalMemory() - rt.freeMemory(); 607 log.debug("received serviceState; expanded memory by " + 608 (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before + 609 ", used memory after: " + used_mem_after + ")"); 610 } 611 612 private void recordSetStateFailure(Throwable t) 613 { 614 log.error("failed setting serviceState", t); 615 if (t instanceof Exception ) 616 setStateException = (Exception ) t; 617 else 618 setStateException = new Exception (t); 619 } 620 621 private void notifyStateTransferCompleted() 622 { 623 synchronized (stateLock) 624 { 625 stateLock.notifyAll(); 627 } 628 } 629 630 632 public void suspect(org.jgroups.Address suspected_mbr) 633 { 634 logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString())); 635 if (isCurrentNodeCoordinator ()) 636 clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr); 637 else 638 log.info("Suspected member: " + suspected_mbr); 639 } 640 641 public void block() {} 642 643 652 public void viewAccepted(View newView) 653 { 654 try 655 { 656 this.currentViewId = newView.getVid().getId(); 659 660 this.jgotherMembers = (Vector )newView.getMembers().clone(); 663 this.jgotherMembers.remove (channel.getLocalAddress()); 664 this.otherMembers = translateAddresses (this.jgotherMembers); Vector translatedNewView = translateAddresses ((Vector )newView.getMembers().clone()); 666 logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId + 667 " (old view: " + this.members + " )"); 668 669 670 Vector oldMembers = this.members; 672 673 Vector newjgMembers = (Vector )newView.getMembers().clone(); 674 Vector newMembers = translateAddresses(newjgMembers); if (this.members == null) 676 { 677 this.members = newMembers; 679 this.jgmembers = newjgMembers; 680 log.debug("ViewAccepted: initial members set"); 681 return; 682 } 683 this.members = newMembers; 684 this.jgmembers = newjgMembers; 685 686 int difference = 0; 687 if (oldMembers == null) 688 difference = newMembers.size () - 1; 689 else 690 difference = newMembers.size () - oldMembers.size (); 691 692 if (isCurrentNodeCoordinator ()) 693 clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " + 694 this.currentViewId + ", delta: " + difference + ") : " + this.members); 695 else 696 log.info("New cluster view for partition " + getPartitionName() + ": " + 697 this.currentViewId + " (" + this.members + " delta: " + difference + ")"); 698 699 ViewChangeEvent event = new ViewChangeEvent(); 701 event.viewId = currentViewId; 702 event.allMembers = translatedNewView; 703 event.deadMembers = getDeadMembers(oldMembers, event.allMembers); 704 event.newMembers = getNewMembers(oldMembers, event.allMembers); 705 event.originatingGroups = null; 706 if(newView instanceof MergeView) 708 { 709 MergeView mergeView = (MergeView) newView; 710 event.originatingGroups = mergeView.getSubgroups(); 711 } 712 713 log.debug("membership changed from " + this.members.size() + " to " 714 + event.allMembers.size()); 715 this.asynchHandler.queueEvent(event); 717 718 if (this.allowSyncListeners) 720 { 721 this.notifyListeners(synchListeners, event.viewId, event.allMembers, 722 event.deadMembers, event.newMembers, event.originatingGroups); 723 } 724 } 725 catch (Exception ex) 726 { 727 log.error("ViewAccepted failed", ex); 728 } 729 } 730 731 733 public String getNodeName() 734 { 735 return nodeName; 736 } 737 738 public String getPartitionName() 739 { 740 return (config == null ? null : config.getPartitionName()); 741 } 742 743 public DistributedReplicantManager getDistributedReplicantManager() 744 { 745 return replicantManager; 746 } 747 748 public DistributedState getDistributedStateService() 749 { 750 return config.getDistributedState(); 751 } 752 753 public long getCurrentViewId() 754 { 755 return this.currentViewId; 756 } 757 758 public Vector getCurrentView() 759 { 760 Vector result = new Vector (this.members.size()); 761 for (int i = 0; i < members.size(); i++) 762 { 763 result.add( ((ClusterNode) members.elementAt(i)).getName() ); 764 } 765 return result; 766 } 767 768 public ClusterNode[] getClusterNodes () 769 { 770 ClusterNode[] nodes = new ClusterNode[this.members.size()]; 771 this.members.toArray(nodes); 772 return nodes; 773 } 774 775 public ClusterNode getClusterNode () 776 { 777 return me; 778 } 779 780 public boolean isCurrentNodeCoordinator () 781 { 782 if(this.members == null || this.members.size() == 0 || this.me == null) 783 return false; 784 return this.members.elementAt (0).equals (this.me); 785 } 786 787 public void registerRPCHandler(String objName, Object subscriber) 794 { 795 rpcHandlers.put(objName, subscriber); 796 } 797 798 public void unregisterRPCHandler(String objName, Object subscriber) 799 { 800 rpcHandlers.remove(objName); 801 } 802 803 804 814 public ArrayList callMethodOnCluster(String objName, String methodName, 815 Object [] args, boolean excludeSelf) throws Exception 816 { 817 return callMethodOnCluster(objName, methodName, args, null, excludeSelf); 818 } 819 820 823 public ArrayList callMethodOnCluster(String objName, String methodName, 824 Object [] args, Class [] types, boolean excludeSelf) throws Exception 825 { 826 return callMethodOnCluster(objName, methodName, args, types, excludeSelf, getMethodCallTimeout()); 827 } 828 829 830 public ArrayList callMethodOnCluster(String objName, String methodName, 831 Object [] args, Class [] types, boolean excludeSelf, long methodTimeout) throws Exception 832 { 833 ArrayList rtn = new ArrayList (); 834 MethodCall m=null; 835 RspList rsp = null; 836 boolean trace = log.isTraceEnabled(); 837 838 if(types != null) 839 m=new MethodCall(objName + "." + methodName, args, types); 840 else 841 m=new MethodCall(objName + "." + methodName, args); 842 843 if (excludeSelf) 844 { 845 if( trace ) 846 { 847 log.trace("callMethodOnCluster(true), objName="+objName 848 +", methodName="+methodName+", members="+jgotherMembers); 849 } 850 rsp = dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout); 851 } 852 else 853 { 854 if( trace ) 855 { 856 log.trace("callMethodOnCluster(false), objName="+objName 857 +", methodName="+methodName+", members="+members); 858 } 859 rsp = dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout); 860 } 861 862 if (rsp != null) 863 { 864 for (int i = 0; i < rsp.size(); i++) 865 { 866 Object item = rsp.elementAt(i); 867 if (item instanceof Rsp) 868 { 869 Rsp response = (Rsp) item; 870 boolean wasReceived = response.wasReceived(); 872 if( wasReceived == true ) 873 { 874 item = response.getValue(); 875 if (!(item instanceof NoHandlerForRPC)) 876 rtn.add(item); 877 } 878 else if( trace ) 879 log.trace("Ignoring non-received response: "+response); 880 } 881 else 882 { 883 if (!(item instanceof NoHandlerForRPC)) 884 rtn.add(item); 885 else if( trace ) 886 log.trace("Ignoring NoHandlerForRPC"); 887 } 888 } 889 } 890 891 return rtn; 892 } 893 894 906 public ArrayList callMethodOnCoordinatorNode(String objName, String methodName, 907 Object [] args, Class [] types,boolean excludeSelf) throws Exception 908 { 909 return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, getMethodCallTimeout()); 910 } 911 912 925 public ArrayList callMethodOnCoordinatorNode(String objName, String methodName, 926 Object [] args, Class [] types,boolean excludeSelf, long methodTimeout) throws Exception 927 { 928 ArrayList rtn = new ArrayList (); 929 MethodCall m=null; 930 RspList rsp = null; 931 boolean trace = log.isTraceEnabled(); 932 933 if(types != null) 934 m=new MethodCall(objName + "." + methodName, args, types); 935 else 936 m=new MethodCall(objName + "." + methodName, args); 937 938 if( trace ) 939 { 940 log.trace("callMethodOnCoordinatorNode(false), objName="+objName 941 +", methodName="+methodName); 942 } 943 944 Vector coordinatorOnly = new Vector (); 946 if (false == isCurrentNodeCoordinator () || 948 false == excludeSelf) 949 coordinatorOnly.addElement(this.jgmembers.elementAt (0)); 950 951 rsp = dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout); 952 953 if (rsp != null) 954 { 955 for (int i = 0; i < rsp.size(); i++) 956 { 957 Object item = rsp.elementAt(i); 958 if (item instanceof Rsp) 959 { 960 Rsp response = (Rsp) item; 961 boolean wasReceived = response.wasReceived(); 963 if( wasReceived == true ) 964 { 965 item = response.getValue(); 966 if (!(item instanceof NoHandlerForRPC)) 967 rtn.add(item); 968 } 969 else if( trace ) 970 log.trace("Ignoring non-received response: "+response); 971 } 972 else 973 { 974 if (!(item instanceof NoHandlerForRPC)) 975 rtn.add(item); 976 else if( trace ) 977 log.trace("Ignoring NoHandlerForRPC"); 978 } 979 } 980 } 981 982 return rtn; 983 } 984 985 986 995 public void callAsynchMethodOnCluster(String objName, String methodName, 996 Object [] args, boolean excludeSelf) 997 throws Exception 998 { 999 callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf); 1000 } 1001 1002 1005 public void callAsynchMethodOnCluster(String objName, String methodName, 1006 Object [] args, Class [] types, boolean excludeSelf) throws Exception 1007 { 1008 MethodCall m = null; 1009 boolean trace = log.isTraceEnabled(); 1010 1011 if(types != null) 1012 m=new MethodCall(objName + "." + methodName, args, types); 1013 else 1014 m=new MethodCall(objName + "." + methodName, args); 1015 1016 if (excludeSelf) 1017 { 1018 if( trace ) 1019 { 1020 log.trace("callAsynchMethodOnCluster(true), objName="+objName 1021 +", methodName="+methodName+", members="+jgotherMembers); 1022 } 1023 dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout()); 1024 } 1025 else 1026 { 1027 if( trace ) 1028 { 1029 log.trace("callAsynchMethodOnCluster(false), objName="+objName 1030 +", methodName="+methodName+", members="+members); 1031 } 1032 dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout()); 1033 } 1034 } 1035 1036 public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber) 1043 { 1044 stateHandlers.put(objectName, subscriber); 1045 } 1046 1047 public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber) 1048 { 1049 stateHandlers.remove(objectName); 1050 } 1051 1052 public void registerMembershipListener(HAMembershipListener listener) 1059 { 1060 boolean isAsynch = (this.allowSyncListeners == false) 1061 || (listener instanceof AsynchHAMembershipListener) 1062 || (listener instanceof AsynchHAMembershipExtendedListener); 1063 if( isAsynch ) { 1064 synchronized(this.asynchListeners) { 1065 this.asynchListeners.add(listener); 1066 } 1067 } 1068 else { 1069 synchronized(this.synchListeners) { 1070 this.synchListeners.add(listener); 1071 } 1072 } 1073 } 1074 1075 public void unregisterMembershipListener(HAMembershipListener listener) 1076 { 1077 boolean isAsynch = (this.allowSyncListeners == false) 1078 || (listener instanceof AsynchHAMembershipListener) 1079 || (listener instanceof AsynchHAMembershipExtendedListener); 1080 if( isAsynch ) { 1081 synchronized(this.asynchListeners) { 1082 this.asynchListeners.remove(listener); 1083 } 1084 } 1085 else { 1086 synchronized(this.synchListeners) { 1087 this.synchListeners.remove(listener); 1088 } 1089 } 1090 } 1091 1092 public boolean getAllowSynchronousMembershipNotifications() 1093 { 1094 return allowSyncListeners; 1095 } 1096 1097 public void setAllowSynchronousMembershipNotifications(boolean allowSync) 1098 { 1099 this.allowSyncListeners = allowSync; 1100 } 1101 1102 1104 public void processEvent(Object event) 1105 { 1106 ViewChangeEvent vce = (ViewChangeEvent) event; 1107 notifyListeners(asynchListeners, vce.viewId, vce.allMembers, 1108 vce.deadMembers, vce.newMembers, vce.originatingGroups); 1109 1110 } 1111 1112 1113 1115 public void setDistributedReplicantManager(DistributedReplicantManager drm) 1116 { 1117 if (this.replicantManager != null && !(replicantManager == drm)) 1118 throw new IllegalStateException ("DistributedReplicantManager already set"); 1119 1120 this.replicantManager = drm; 1121 } 1122 1123 1125 protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception 1126 { 1127 byte[] localUniqueName = this.localJGAddress.getAdditionalData(); 1128 if (localUniqueName == null) 1129 { 1130 log.error("No additional information has been found in the JGroups address; " + 1131 "make sure you are running with a correct version of JGroups and that the protocols " + 1132 "you are using support 'additionalData' behaviour."); 1133 throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; local node name is missing."); 1134 } 1135 1136 for (int i = 0; i < javaGroupIpAddresses.size(); i++) 1137 { 1138 IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i); 1139 if (!address.equals(this.localJGAddress)) 1140 { 1141 if (localUniqueName.equals(address.getAdditionalData())) 1142 throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; another node (" + address + ") publicizing the same name was already there."); 1143 } 1144 } 1145 } 1146 1147 1155 protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception 1156 { 1157 NonSerializableFactory.bind(jndiName, who); 1160 Name n = ctx.getNameParser("").parse(jndiName); 1161 while (n.size () > 1) 1162 { 1163 String ctxName = n.get (0); 1164 try 1165 { 1166 ctx = (Context )ctx.lookup (ctxName); 1167 } 1168 catch (NameNotFoundException e) 1169 { 1170 log.debug ("creating Subcontext " + ctxName); 1171 ctx = ctx.createSubcontext (ctxName); 1172 } 1173 n = n.getSuffix (1); 1174 } 1175 1176 StringRefAddr addr = new StringRefAddr ("nns", jndiName); 1180 Reference ref = new Reference (classType.getName (), addr, NonSerializableFactory.class.getName (), null); 1181 ctx.rebind (n.get (0), ref); 1182 } 1183 1184 1191 protected Vector getDeadMembers(Vector oldMembers, Vector newMembers) 1192 { 1193 if(oldMembers == null) oldMembers=new Vector (); 1194 if(newMembers == null) newMembers=new Vector (); 1195 Vector dead=(Vector )oldMembers.clone(); 1196 dead.removeAll(newMembers); 1197 log.debug("dead members: " + dead); 1198 return dead; 1199 } 1200 1201 1207 protected Vector getNewMembers(Vector oldMembers, Vector allMembers) 1208 { 1209 if(oldMembers == null) oldMembers=new Vector (); 1210 if(allMembers == null) allMembers=new Vector (); 1211 Vector newMembers=(Vector )allMembers.clone(); 1212 newMembers.removeAll(oldMembers); 1213 return newMembers; 1214 } 1215 1216 protected void notifyListeners(ArrayList theListeners, long viewID, 1217 Vector allMembers, Vector deadMembers, Vector newMembers, 1218 Vector originatingGroups) 1219 { 1220 log.debug("Begin notifyListeners, viewID: "+viewID); 1221 synchronized(theListeners) 1222 { 1223 theListeners = (ArrayList ) theListeners.clone(); 1225 } 1226 1227 for (int i = 0; i < theListeners.size(); i++) 1228 { 1229 HAMembershipListener aListener = null; 1230 try 1231 { 1232 aListener = (HAMembershipListener) theListeners.get(i); 1233 if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener)) 1234 { 1235 HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener; 1236 exListener.membershipChangedDuringMerge (deadMembers, newMembers, 1237 allMembers, originatingGroups); 1238 } 1239 else 1240 { 1241 aListener.membershipChanged(deadMembers, newMembers, allMembers); 1242 } 1243 } 1244 catch (Throwable e) 1245 { 1246 log.warn("HAMembershipListener callback failure: "+aListener, e); 1248 } 1249 } 1250 1251 log.debug("End notifyListeners, viewID: "+viewID); 1252 } 1253 1254 protected Vector translateAddresses (Vector jgAddresses) 1255 { 1256 if (jgAddresses == null) 1257 return null; 1258 1259 Vector result = new Vector (jgAddresses.size()); 1260 for (int i = 0; i < jgAddresses.size(); i++) 1261 { 1262 IpAddress addr = (IpAddress) jgAddresses.elementAt(i); 1263 result.add(new ClusterNode (addr)); 1264 } 1265 1266 return result; 1267 } 1268 1269 public void logHistory (String message) 1270 { 1271 try 1272 { 1273 history.add(new SimpleDateFormat ().format (new Date ()) + " : " + message); 1274 } 1275 catch (Exception ignored){} 1276 } 1277 1278 1280 public String showHistory () 1281 { 1282 StringBuffer buff = new StringBuffer (); 1283 Vector data = new Vector (this.history); 1284 for (java.util.Iterator row = data.iterator(); row.hasNext();) 1285 { 1286 String info = (String ) row.next(); 1287 buff.append(info).append("\n"); 1288 } 1289 return buff.toString(); 1290 } 1291 1292 public String showHistoryAsXML () 1293 { 1294 StringBuffer buff = new StringBuffer (); 1295 buff.append("<events>\n"); 1296 Vector data = new Vector (this.history); 1297 for (java.util.Iterator row = data.iterator(); row.hasNext();) 1298 { 1299 buff.append(" <event>\n "); 1300 String info = (String ) row.next(); 1301 buff.append(info); 1302 buff.append("\n </event>\n"); 1303 } 1304 buff.append("</events>\n"); 1305 return buff.toString(); 1306 } 1307 1308 public void startChannelDebugger() 1309 { 1310 startChannelDebugger(false); 1311 } 1312 1313 public void startChannelDebugger(boolean accumulative) 1314 { 1315 if(debugger == null) 1316 { 1317 debugger=new Debugger(this.channel, accumulative); 1318 debugger.start(); 1319 } 1320 } 1321 1322 public void stopChannelDebugger() 1323 { 1324 if(debugger != null) 1325 { 1326 debugger=null; 1328 } 1329 } 1330 1331 public Cache getClusteredCache() 1332 { 1333 return config.getClusteredCache(); 1334 } 1335 1336 public boolean getDeadlockDetection() 1337 { 1338 return config.getDeadlockDetection(); 1339 } 1340 1341 public HAPartition getHAPartition() 1342 { 1343 return this; 1344 } 1345 1346 public String getJGroupsVersion() 1347 { 1348 return Version.description + "( " + Version.cvs + ")"; 1349 } 1350 1351 public JChannelFactoryMBean getMultiplexer() 1352 { 1353 return config.getMultiplexer(); 1354 } 1355 1356 public String getMultiplexerStack() 1357 { 1358 return config.getMultiplexerStack(); 1359 } 1360 1361 public InetAddress getNodeAddress() 1362 { 1363 return config.getNodeAddress(); 1364 } 1365 1366 public long getStateTransferTimeout() { 1367 return config.getStateTransferTimeout(); 1368 } 1369 1370 public long getMethodCallTimeout() { 1371 return config.getMethodCallTimeout(); 1372 } 1373 1374 public void setMethodCallTimeout(long timeout) 1375 { 1376 config.setMethodCallTimeout(timeout); 1377 } 1378 1379 public void setStateTransferTimeout(long timeout) 1380 { 1381 config.setStateTransferTimeout(timeout); 1382 } 1383 1384 public String getNodeUniqueId() 1385 { 1386 return config.getNodeUniqueId(); 1387 } 1388 1389 1391 protected void configureUniqueId() throws Exception 1392 { 1393 boolean pushNodeName = true; 1396 String uniqueId = config.getNodeUniqueId(); 1397 if (uniqueId == null || "".equals(uniqueId)) { 1398 IpAddress ourAddr = (IpAddress) channel.getLocalAddress(); 1399 if (ourAddr != null) 1400 { 1401 byte[] additional_data = ourAddr.getAdditionalData(); 1402 if (additional_data != null) 1403 { 1404 uniqueId = new String (additional_data); 1405 config.setNodeUniqueId(uniqueId); 1406 pushNodeName = false; 1407 } 1408 } 1409 } 1410 if (uniqueId == null || "".equals(uniqueId)) { 1411 uniqueId = generateUniqueId(); 1412 config.setNodeUniqueId(uniqueId); 1413 } 1414 1415 if (pushNodeName) 1416 { 1417 java.util.HashMap staticNodeName = new java.util.HashMap (); 1418 staticNodeName.put("additional_data", uniqueId.getBytes()); 1419 this.channel.down(new Event(Event.CONFIG, staticNodeName)); 1420 } 1421 1422 config.setNodeUniqueId(uniqueId); 1423 } 1424 1425 protected String generateUniqueId() throws Exception 1426 { 1427 1433 1436 1438 String hostIP = null; 1439 InetAddress address = ServerConfigUtil.fixRemoteAddress(config.getNodeAddress()); 1440 if (address == null) 1441 { 1442 log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)"); 1443 log.debug ("using a full GUID strategy"); 1444 return new VMID ().toString(); 1445 } 1446 else 1447 { 1448 hostIP = address.getHostAddress(); 1449 } 1450 1451 int namingPort = config.getNamingServicePort(); 1453 if (namingPort > 0) 1454 { 1455 return hostIP + ":" + namingPort; 1456 } 1457 1458 String uid = new UID ().toString(); 1461 return hostIP + ":" + uid; 1462 } 1463 1464 1466 1468 private class MessageListenerAdapter 1469 implements ExtendedMessageListener 1470 { 1471 1472 public void getState(OutputStream stream) 1473 { 1474 logHistory ("getState called on partition"); 1475 1476 log.debug("getState called."); 1477 try 1478 { 1479 getStateInternal(stream); 1480 } 1481 catch (Exception ex) 1482 { 1483 log.error("getState failed", ex); 1484 } 1485 1486 } 1487 1488 public void getState(String state_id, OutputStream ostream) 1489 { 1490 throw new UnsupportedOperationException ("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594"); 1491 } 1492 1493 public byte[] getState(String state_id) 1494 { 1495 throw new UnsupportedOperationException ("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594"); 1496 } 1497 1498 public void setState(InputStream stream) 1499 { 1500 logHistory ("setState called on partition"); 1501 try 1502 { 1503 if (stream == null) 1504 { 1505 log.debug("transferred serviceState is null (may be first member in cluster)"); 1506 } 1507 else 1508 { 1509 setStateInternal(stream); 1510 } 1511 1512 isStateSet = true; 1513 } 1514 catch (Throwable t) 1515 { 1516 recordSetStateFailure(t); 1517 } 1518 finally 1519 { 1520 notifyStateTransferCompleted(); 1521 } 1522 } 1523 1524 public byte[] getState() 1525 { 1526 logHistory ("getState called on partition"); 1527 1528 log.debug("getState called."); 1529 try 1530 { 1531 ByteArrayOutputStream baos = new ByteArrayOutputStream (1024); 1532 getStateInternal(baos); 1533 return baos.toByteArray(); 1534 } 1535 catch (Exception ex) 1536 { 1537 log.error("getState failed", ex); 1538 } 1539 return null; } 1541 1542 public void setState(String state_id, byte[] state) 1543 { 1544 throw new UnsupportedOperationException ("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594"); 1545 } 1546 1547 public void setState(String state_id, InputStream istream) 1548 { 1549 throw new UnsupportedOperationException ("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594"); 1550 } 1551 1552 public void receive(org.jgroups.Message msg) 1553 { } 1554 1555 public void setState(byte[] obj) 1556 { 1557 logHistory ("setState called on partition"); 1558 try 1559 { 1560 if (obj == null) 1561 { 1562 log.debug("transferred serviceState is null (may be first member in cluster)"); 1563 } 1564 else 1565 { 1566 ByteArrayInputStream bais = new ByteArrayInputStream (obj); 1567 setStateInternal(bais); 1568 bais.close(); 1569 } 1570 1571 isStateSet = true; 1572 } 1573 catch (Throwable t) 1574 { 1575 recordSetStateFailure(t); 1576 } 1577 finally 1578 { 1579 notifyStateTransferCompleted(); 1580 } 1581 } 1582 1583 } 1584 1585 1589 private static class ViewChangeEvent 1590 { 1591 long viewId; 1592 Vector deadMembers; 1593 Vector newMembers; 1594 Vector allMembers; 1595 Vector originatingGroups; 1596 } 1597 1598 private static class MarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller 1599 { 1600 1601 public Object objectFromByteBuffer(byte[] buf) throws Exception 1602 { 1603 return ClusterPartition.objectFromByteBuffer(buf); 1604 } 1605 1606 public byte[] objectToByteBuffer(Object obj) throws Exception 1607 { 1608 return ClusterPartition.objectToByteBuffer(obj); 1609 } 1610 } 1611 1612 1616 private class RpcHandler extends RpcDispatcher 1617 { 1618 private RpcHandler(Channel channel, MessageListener l, MembershipListener l2, Object server_obj, 1619 boolean deadlock_detection) 1620 { 1621 super(channel, l, l2, server_obj, deadlock_detection); 1622 } 1623 1624 1633 public Object handle(Message req) 1634 { 1635 Object body = null; 1636 Object retval = null; 1637 MethodCall method_call = null; 1638 boolean trace = log.isTraceEnabled(); 1639 1640 if( trace ) 1641 log.trace("Partition " + getPartitionName() + " received msg"); 1642 if(req == null || req.getBuffer() == null) 1643 { 1644 log.warn("message or message buffer is null !"); 1645 return null; 1646 } 1647 1648 try 1649 { 1650 body = objectFromByteBuffer(req.getBuffer()); 1651 } 1652 catch(Exception e) 1653 { 1654 log.warn("failed unserializing message buffer (msg=" + req + ")", e); 1655 return null; 1656 } 1657 1658 if(body == null || !(body instanceof MethodCall)) 1659 { 1660 log.warn("message does not contain a MethodCall object !"); 1661 return null; 1662 } 1663 1664 method_call = (MethodCall)body; 1667 String methodName = method_call.getName(); 1668 1669 if( trace ) 1670 log.trace("pre methodName: " + methodName); 1671 1672 int idx = methodName.lastIndexOf('.'); 1673 String handlerName = methodName.substring(0, idx); 1674 String newMethodName = methodName.substring(idx + 1); 1675 1676 if( trace ) 1677 { 1678 log.trace("handlerName: " + handlerName + " methodName: " + newMethodName); 1679 log.trace("Handle: " + methodName); 1680 } 1681 1682 method_call.setName(newMethodName); 1684 Object handler = rpcHandlers.get(handlerName); 1685 if (handler == null) 1686 { 1687 if( trace ) 1688 log.debug("No rpc handler registered under: "+handlerName); 1689 return new NoHandlerForRPC(); 1690 } 1691 1692 1696 try 1697 { 1698 retval = method_call.invoke(handler); 1699 if( trace ) 1700 log.trace("rpc call return value: "+retval); 1701 } 1702 catch (Throwable t) 1703 { 1704 if( trace ) 1705 log.trace("rpc call threw exception", t); 1706 retval = t; 1707 } 1708 1709 return retval; 1710 } 1711 1712 } 1713 1714 private void setupLoggers(String partitionName) 1715 { 1716 if (partitionName == null) 1717 { 1718 this.log = Logger.getLogger(HAPartition.class.getName()); 1719 this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle"); 1720 } 1721 else 1722 { 1723 this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName); 1724 this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName); 1725 } 1726 } 1727 1728} 1729 | Popular Tags |