1 16 17 package org.apache.catalina.cluster.tcp; 18 19 import java.beans.PropertyChangeSupport ; 20 import java.io.IOException ; 21 import java.net.URL ; 22 import java.util.HashMap ; 23 import java.util.Vector ; 24 25 import javax.management.MBeanServer ; 26 import javax.management.MBeanServerFactory ; 27 import javax.management.ObjectName ; 28 import javax.management.modelmbean.ModelMBean ; 29 30 import org.apache.catalina.Container; 31 import org.apache.catalina.Lifecycle; 32 import org.apache.catalina.LifecycleEvent; 33 import org.apache.catalina.LifecycleException; 34 import org.apache.catalina.LifecycleListener; 35 import org.apache.catalina.Manager; 36 import org.apache.catalina.Valve; 37 import org.apache.catalina.cluster.CatalinaCluster; 38 import org.apache.catalina.cluster.ClusterManager; 39 import org.apache.catalina.cluster.ClusterMessage; 40 import org.apache.catalina.cluster.Constants; 41 import org.apache.catalina.cluster.Member; 42 import org.apache.catalina.cluster.MembershipListener; 43 import org.apache.catalina.cluster.MembershipService; 44 import org.apache.catalina.cluster.MessageListener; 45 import org.apache.catalina.cluster.io.ListenCallback; 46 import org.apache.catalina.cluster.session.ReplicationStream; 47 import org.apache.catalina.cluster.session.SessionMessage; 48 import org.apache.catalina.core.StandardHost; 49 import org.apache.catalina.util.LifecycleSupport; 50 import org.apache.catalina.util.StringManager; 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.modeler.ManagedBean; 53 import org.apache.commons.modeler.Registry; 54 import org.apache.tomcat.util.IntrospectionUtils; 55 56 74 75 public class SimpleTcpCluster implements CatalinaCluster, Lifecycle, 76 MembershipListener, ListenCallback, LifecycleListener { 77 78 public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 79 .getLog(SimpleTcpCluster.class); 80 81 83 86 protected static final String info = "SimpleTcpCluster/1.0"; 87 88 91 protected MembershipService membershipService = null; 92 93 96 protected boolean expireSessionsOnShutdown = true; 97 98 101 protected boolean printToScreen = false; 102 103 107 protected boolean useDirtyFlag = false; 108 109 112 protected String clusterImpName = "SimpleTcpCluster"; 113 114 117 protected StringManager sm = StringManager.getManager(Constants.Package); 118 119 122 protected String clusterName = null; 123 124 127 protected Container container = null; 128 129 132 protected LifecycleSupport lifecycle = new LifecycleSupport(this); 133 134 137 private MBeanServer mserver = null; 138 139 142 private Registry registry = null; 143 144 147 protected boolean started = false; 148 149 152 protected PropertyChangeSupport support = new PropertyChangeSupport (this); 153 154 157 protected HashMap managers = new HashMap (); 158 159 162 private long nrOfMsgsReceived = 0; 163 164 private long msgSendTime = 0; 165 166 private long lastChecked = System.currentTimeMillis(); 167 168 protected MemberComparator memberComparator = new MemberComparator(); 170 171 private String managerClassName = "org.apache.catalina.cluster.session.DeltaManager"; 172 173 176 private org.apache.catalina.cluster.ClusterSender clusterSender; 177 178 181 private org.apache.catalina.cluster.ClusterReceiver clusterReceiver; 182 183 private org.apache.catalina.Valve valve; 184 185 private org.apache.catalina.cluster.ClusterDeployer clusterDeployer; 186 187 190 protected Vector clusterListeners = new Vector (); 191 192 195 private boolean notifyListenersOnReplication = true; 196 197 private ObjectName objectName = null; 198 199 201 public SimpleTcpCluster() { 202 } 203 204 209 public String getInfo() { 210 return (info); 211 } 212 213 220 public void setClusterName(String clusterName) { 221 this.clusterName = clusterName; 222 } 223 224 230 public String getClusterName() { 231 return clusterName; 232 } 233 234 240 public void setContainer(Container container) { 241 Container oldContainer = this.container; 242 this.container = container; 243 support.firePropertyChange("container", oldContainer, this.container); 244 } 246 247 252 public Container getContainer() { 253 return (this.container); 254 } 255 256 265 public void setProtocol(String protocol) { 266 } 267 268 271 public String getProtocol() { 272 return null; 273 } 274 275 public Member[] getMembers() { 276 Member[] members = membershipService.getMembers(); 277 java.util.Arrays.sort(members, memberComparator); 279 return members; 280 } 281 282 287 public Member getLocalMember() { 288 return membershipService.getLocalMember(); 289 } 290 291 293 public synchronized Manager createManager(String name) { 294 if (log.isDebugEnabled()) 295 log.debug("Creating ClusterManager for context " + name 296 + " using class " + getManagerClassName()); 297 ClusterManager manager = null; 298 try { 299 manager = (ClusterManager) getClass().getClassLoader().loadClass( 300 getManagerClassName()).newInstance(); 301 } catch (Exception x) { 302 log.error("Unable to load class for replication manager", x); 303 manager = new org.apache.catalina.cluster.session.SimpleTcpReplicationManager(); 304 } 305 addManager(name, manager); 306 return manager; 307 } 308 309 313 public void removeManager(String name) { 314 managers.remove(name); 315 } 316 317 321 public void addManager(String name, ClusterManager manager) { 322 manager.setName(name); 323 manager.setCluster(this); 324 manager.setDistributable(true); 325 manager.setExpireSessionsOnShutdown(expireSessionsOnShutdown); 326 manager.setUseDirtyFlag(useDirtyFlag); 327 manager.setNotifyListenersOnReplication(notifyListenersOnReplication); 328 managers.put(name, manager); 329 } 330 331 public Manager getManager(String name) { 332 return (Manager) managers.get(name); 333 } 334 335 337 343 public void addLifecycleListener(LifecycleListener listener) { 344 lifecycle.addLifecycleListener(listener); 345 } 346 347 352 public void backgroundProcess() { 353 if (clusterDeployer != null) 354 clusterDeployer.backgroundProcess(); 355 356 } 357 358 362 public LifecycleListener[] findLifecycleListeners() { 363 364 return lifecycle.findLifecycleListeners(); 365 366 } 367 368 374 public void removeLifecycleListener(LifecycleListener listener) { 375 lifecycle.removeLifecycleListener(listener); 376 } 377 378 393 public void start() throws LifecycleException { 394 if (started) 395 throw new LifecycleException(sm.getString("cluster.alreadyStarted")); 396 if (log.isInfoEnabled()) 397 log.info("Cluster is about to start"); 398 try { 399 if (log.isDebugEnabled()) 400 log.debug("Invoking addValve on " + getContainer() 401 + " with class=" + valve.getClass().getName()); 402 if (valve != null) { 403 IntrospectionUtils.callMethodN(getContainer(), "addValve", 404 new Object [] { valve }, new Class [] { Thread 405 .currentThread().getContextClassLoader() 406 .loadClass("org.apache.catalina.Valve") }); 407 408 } 409 registerMBeans(); 410 clusterReceiver.setSendAck(clusterSender.isWaitForAck()); 411 clusterReceiver.setCatalinaCluster(this); 412 clusterReceiver.start(); 413 clusterSender.setCatalinaCluster(this); 414 clusterSender.start(); 415 membershipService.setLocalMemberProperties(clusterReceiver 416 .getHost(), clusterReceiver.getPort()); 417 membershipService.addMembershipListener(this); 418 membershipService.start(); 419 try { 421 if (clusterDeployer != null) { 422 clusterDeployer.setCluster(this); 423 clusterDeployer.start(); 424 } 425 } catch (Throwable x) { 426 log 427 .fatal( 428 "Unable to retrieve the container deployer. Cluster deployment disabled.", 429 x); 430 } this.started = true; 432 } catch (Exception x) { 433 log.error("Unable to start cluster.", x); 434 throw new LifecycleException(x); 435 } 436 } 437 438 444 public void send(ClusterMessage msg, Member dest) { 445 try { 446 msg.setAddress(membershipService.getLocalMember()); 447 Member destination = dest; 448 449 if (msg instanceof SessionMessage) { 450 SessionMessage smsg = (SessionMessage) msg; 451 if ((destination == null) 453 && (smsg.getEventType() == SessionMessage.EVT_GET_ALL_SESSIONS) 454 && (membershipService.getMembers().length > 0)) { 455 destination = membershipService.getMembers()[0]; 456 } 457 } 458 byte[] data = createMessageData(msg); 459 if (destination != null) { 460 Member tcpdest = dest; 461 if ((tcpdest != null) 462 && (!membershipService.getLocalMember().equals(tcpdest))) { 463 clusterSender.sendMessage(msg.getUniqueId(), data, tcpdest); 464 } 465 } else { 466 clusterSender.sendMessage(msg.getUniqueId(), data); 467 } 468 } catch (Exception x) { 469 log.error("Unable to send message through cluster sender.", x); 470 } 471 } 472 473 479 protected byte[] createMessageData(ClusterMessage msg) throws IOException { 480 msg.setTimestamp(System.currentTimeMillis()); 481 java.io.ByteArrayOutputStream outs = new java.io.ByteArrayOutputStream (); 482 java.io.ObjectOutputStream out = new java.io.ObjectOutputStream ( 483 outs); 484 out.writeObject(msg); 485 byte[] data = outs.toByteArray(); 486 return data; 487 } 488 489 494 public void send(ClusterMessage msg) { 495 send(msg, null); 496 } 497 498 512 public void stop() throws LifecycleException { 513 514 if (!started) 515 throw new IllegalStateException (sm.getString("cluster.notStarted")); 516 unregisterMBeans(); 517 518 membershipService.stop(); 519 membershipService.removeMembershipListener(); 520 try { 521 clusterSender.stop(); 522 } catch (Exception x) { 523 log.error("Unable to stop cluster sender.", x); 524 } 525 try { 526 clusterReceiver.stop(); 527 clusterReceiver.setCatalinaCluster(null); 528 } catch (Exception x) { 529 log.error("Unable to stop cluster receiver.", x); 530 } 531 if (clusterDeployer != null) { 532 clusterDeployer.stop(); 533 } 534 started = false; 535 } 536 537 541 public void memberAdded(Member member) { 542 try { 543 if (log.isInfoEnabled()) 544 log.info("Replication member added:" + member); 545 clusterSender.add(member); 546 } catch (Exception x) { 547 log.error("Unable to connect to replication system.", x); 548 } 549 550 } 551 552 556 public void memberDisappeared(Member member) { 557 if (log.isInfoEnabled()) 558 log.info("Received member disappeared:" + member); 559 try { 560 clusterSender.remove(member); 561 } catch (Exception x) { 562 log.error("Unable remove cluster node from replication system.", x); 563 } 564 565 } 566 567 public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) { 568 this.expireSessionsOnShutdown = expireSessionsOnShutdown; 569 } 570 571 public void setPrintToScreen(boolean printToScreen) { 572 this.printToScreen = printToScreen; 573 } 574 575 578 public long getMsgSendTime() { 579 return msgSendTime; 580 } 581 582 585 public long getLastChecked() { 586 return lastChecked; 587 } 588 589 592 public long getNrOfMsgsReceived() { 593 return nrOfMsgsReceived; 594 } 595 596 599 public boolean isExpireSessionsOnShutdown() { 600 return expireSessionsOnShutdown; 601 } 602 603 606 public boolean isPrintToScreen() { 607 return printToScreen; 608 } 609 610 613 public boolean isUseDirtyFlag() { 614 return useDirtyFlag; 615 } 616 617 public void setUseDirtyFlag(boolean useDirtyFlag) { 618 this.useDirtyFlag = useDirtyFlag; 619 } 620 621 public void messageDataReceived(byte[] data) { 622 long timeSent = System.currentTimeMillis(); 623 try { 624 ReplicationStream stream = new ReplicationStream( 625 new java.io.ByteArrayInputStream (data), getClass() 626 .getClassLoader()); 627 Object myobj = stream.readObject(); 628 if (log.isDebugEnabled() 629 && myobj != null && myobj instanceof ClusterMessage) 630 log.debug("Assuming clocks are synched: Replication for " 631 + ((ClusterMessage) myobj).getUniqueId() + " took=" 632 + (System.currentTimeMillis() - ((ClusterMessage) myobj).getTimestamp()) 633 + " ms."); 634 if (myobj != null && myobj instanceof SessionMessage) { 635 636 SessionMessage msg = (SessionMessage) myobj; 637 String ctxname = msg.getContextName(); 638 if (ctxname == null) { 641 java.util.Iterator i = managers.keySet().iterator(); 642 while (i.hasNext()) { 643 String key = (String ) i.next(); 644 ClusterManager mgr = (ClusterManager) managers.get(key); 645 if (mgr != null) 646 mgr.messageDataReceived(msg); 647 else { 648 if (log.isDebugEnabled()) 651 log.debug("Context manager doesn't exist:" 652 + key); 653 } 654 } 655 } else { 656 ClusterManager mgr = (ClusterManager) managers.get(ctxname); 657 if (mgr != null) 658 mgr.messageDataReceived(msg); 659 else if (log.isWarnEnabled()) 660 log.warn("Context manager doesn't exist:" + ctxname); 661 } 662 } else { 663 for (int i = 0; i < clusterListeners.size(); i++) { 665 MessageListener listener = (MessageListener) clusterListeners 666 .elementAt(i); 667 if (myobj != null && myobj instanceof ClusterMessage 668 && listener.accept((ClusterMessage) myobj)) { 669 listener.messageReceived((ClusterMessage) myobj); 670 } else 671 if(log.isDebugEnabled()) 672 log.debug("Message " + myobj.toString() 673 + " from type " + myobj.getClass().getName() 674 + " transfered but no listener registered"); 675 } 676 } 677 678 } catch (Exception x) { 679 log.error("Unable to deserialize session message.", x); 680 } finally { 681 perfMessageRecvd(timeSent); 682 } 683 } 684 685 public void lifecycleEvent(LifecycleEvent lifecycleEvent) { 686 if (log.isDebugEnabled()) 687 log.debug("\nlifecycleEvent\n\nType" + lifecycleEvent.getType() 688 + "\nData" + lifecycleEvent.getData() + "\n\n\n"); 689 } 690 691 710 public void startContext(String contextPath) throws IOException { 711 return; 712 } 713 714 743 public void installContext(String contextPath, URL war) { 744 if (log.isDebugEnabled()) 745 log.debug("\n\n\n\nCluster Install called for context:" 746 + contextPath + "\n\n\n\n"); 747 } 748 749 766 public void stop(String contextPath) throws IOException { 767 return; 768 } 769 770 public Log getLogger() { 771 return log; 772 } 773 774 776 778 private void perfMessageRecvd(long timeSent) { 779 nrOfMsgsReceived++; 780 long current = System.currentTimeMillis(); 781 msgSendTime += (current - timeSent); 782 if (log.isDebugEnabled()) { 783 if ((current - lastChecked) > 5000) { 784 log.debug("Calc msg send time total=" + msgSendTime 785 + "ms num request=" + nrOfMsgsReceived 786 + " average per msg=" 787 + (msgSendTime / nrOfMsgsReceived) + "ms."); 788 lastChecked=current ; 789 } 790 } 791 } 792 793 public String getManagerClassName() { 794 return managerClassName; 795 } 796 797 public void setManagerClassName(String managerClassName) { 798 this.managerClassName = managerClassName; 799 } 800 801 public org.apache.catalina.cluster.ClusterSender getClusterSender() { 802 return clusterSender; 803 } 804 805 public void setClusterSender( 806 org.apache.catalina.cluster.ClusterSender clusterSender) { 807 this.clusterSender = clusterSender; 808 } 809 810 public org.apache.catalina.cluster.ClusterReceiver getClusterReceiver() { 811 return clusterReceiver; 812 } 813 814 public void setClusterReceiver( 815 org.apache.catalina.cluster.ClusterReceiver clusterReceiver) { 816 this.clusterReceiver = clusterReceiver; 817 } 818 819 public MembershipService getMembershipService() { 820 return membershipService; 821 } 822 823 public void setMembershipService(MembershipService membershipService) { 824 this.membershipService = membershipService; 825 } 826 827 public void addValve(Valve valve) { 828 this.valve = valve; 829 } 830 831 public Valve getValve() { 832 return valve; 833 } 834 835 public void addClusterListener(MessageListener listener) { 836 if (!clusterListeners.contains(listener)) { 837 clusterListeners.addElement(listener); 838 } 839 } 840 841 public void removeClusterListener(MessageListener listener) { 842 clusterListeners.removeElement(listener); 843 } 844 845 public org.apache.catalina.cluster.ClusterDeployer getClusterDeployer() { 846 return clusterDeployer; 847 } 848 849 public void setClusterDeployer( 850 org.apache.catalina.cluster.ClusterDeployer clusterDeployer) { 851 this.clusterDeployer = clusterDeployer; 852 } 853 854 public boolean getNotifyListenersOnReplication() { 855 return notifyListenersOnReplication; 856 } 857 858 public void setNotifyListenersOnReplication( 859 boolean notifyListenersOnReplication) { 860 this.notifyListenersOnReplication = notifyListenersOnReplication; 861 } 862 863 865 871 protected void registerMBeans() { 872 try { 873 getMBeanServer(); 874 String domain = mserver.getDefaultDomain(); 875 String name = ":type=Cluster"; 876 if (container instanceof StandardHost) { 877 domain = ((StandardHost) container).getDomain(); 878 name += ",host=" + container.getName(); 879 } 880 ObjectName clusterName = new ObjectName (domain + name); 881 882 if (mserver.isRegistered(clusterName)) { 883 if (log.isWarnEnabled()) 884 log.warn(sm.getString("cluster.mbean.register.allready", 885 clusterName)); 886 return; 887 } 888 setObjectName(clusterName); 889 mserver.registerMBean(getManagedBean(this), getObjectName()); 890 } catch (Exception ex) { 891 log.error(ex.getMessage(), ex); 892 } 893 } 894 895 protected void unregisterMBeans() { 896 if (mserver != null) { 897 try { 898 mserver.unregisterMBean(getObjectName()); 899 } catch (Exception e) { 900 log.error(e); 901 } 902 } 903 } 904 905 911 protected MBeanServer getMBeanServer() throws Exception { 912 if (mserver == null) { 913 if (MBeanServerFactory.findMBeanServer(null).size() > 0) { 914 mserver = (MBeanServer ) MBeanServerFactory 915 .findMBeanServer(null).get(0); 916 } else { 917 mserver = MBeanServerFactory.createMBeanServer(); 918 } 919 registry = Registry.getRegistry(null, null); 920 registry.loadMetadata(this.getClass() 921 .getResourceAsStream("mbeans-descriptors.xml")); 922 } 923 return (mserver); 924 } 925 926 935 protected ModelMBean getManagedBean(Object object) throws Exception { 936 ModelMBean mbean = null; 937 if (registry != null) { 938 ManagedBean managedBean = registry.findManagedBean(object 939 .getClass().getName()); 940 mbean = managedBean.createMBean(object); 941 } 942 return mbean; 943 } 944 945 public void setObjectName(ObjectName name) { 946 objectName = name ; 947 } 948 949 public ObjectName getObjectName() { 950 return objectName; 951 } 952 953 955 private class MemberComparator implements java.util.Comparator { 956 957 public int compare(Object o1, Object o2) { 958 try { 959 return compare((Member) o1, (Member) o2); 960 } catch (ClassCastException x) { 961 return 0; 962 } 963 } 964 965 public int compare(Member m1, Member m2) { 966 long result = m2.getMemberAliveTime() - m1.getMemberAliveTime(); 968 if (result < 0) 969 return -1; 970 else if (result == 0) 971 return 0; 972 else 973 return 1; 974 } 975 } 976 977 } | Popular Tags |