1 5 package com.tctest; 6 7 import org.apache.commons.io.FileUtils; 8 9 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier; 10 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 11 12 import com.tc.cluster.ClusterEventListener; 13 import com.tc.object.bytecode.ManagerUtil; 14 import com.tc.objectserver.control.ExtraL1ProcessControl; 15 import com.tc.simulator.app.ApplicationConfig; 16 import com.tc.simulator.listener.ListenerProvider; 17 18 import java.io.File ; 19 import java.util.HashSet ; 20 21 public class ClusterMembershipEventTestApp extends ServerCrashingAppBase implements ClusterEventListener { 22 23 public ClusterMembershipEventTestApp(String appId, ApplicationConfig config, ListenerProvider listenerProvider) { 24 super(appId, config, listenerProvider); 25 } 26 27 private final int initialNodeCount = getParticipantCount(); 28 private final CyclicBarrier barrier = new CyclicBarrier(initialNodeCount); 29 30 private final SynchronizedInt nodeConCnt = new SynchronizedInt(0); 32 private final SynchronizedInt nodeDisCnt = new SynchronizedInt(0); 33 private final SynchronizedInt thisNodeConCnt = new SynchronizedInt(0); 34 private final SynchronizedInt thisNodeDisCnt = new SynchronizedInt(0); 35 private final HashSet nodes = new HashSet (); 36 private String thisNode; 37 38 public void run() { 39 try { 40 runTest(); 41 } catch (Throwable t) { 42 notifyError(t); 43 } 44 } 45 46 private void runTest() throws Throwable { 47 ManagerUtil.addClusterEventListener(this); 48 check(1, thisNodeConCnt.get(), "thisNodeConnected"); 49 waitForNodes(initialNodeCount); 50 51 System.err.println("### stage 1 [all nodes connected]: thisNode=" + thisNode + ", threadId=" 52 + Thread.currentThread().getName()); 53 54 clearCounters(); 55 final boolean isMasterNode = barrier.barrier() == 0; 56 if (isMasterNode) { 57 System.err.println("### masterNode=" + thisNode + " -> crashing server..."); 58 getConfig().getServerControl().crash(); 59 System.err.println("### masterNode=" + thisNode + " -> crashed server"); 60 System.err.println("### masterNode=" + thisNode + " -> restarting server..."); 61 getConfig().getServerControl().start(30 * 1000); 62 System.err.println("### masterNode=" + thisNode + " -> restarted server"); 63 } 64 System.err.println("### stage 2 [reconnecting]: thisNode=" + thisNode + ", threadId=" 65 + Thread.currentThread().getName()); 66 barrier.barrier(); 67 waitForNodes(initialNodeCount); 68 check(1, thisNodeDisCnt.get(), "thisNodeDisconnected"); 69 check(1, thisNodeConCnt.get(), "thisNodeConnected"); 70 71 clearCounters(); 72 check(0, nodeConCnt.get(), "nodeConnected"); 73 check(0, nodeDisCnt.get(), "nodeDisconnected"); 74 barrier.barrier(); 75 System.err.println("### stage 3 [reconnected]: thisNode=" + thisNode + ", threadId=" 76 + Thread.currentThread().getName()); 77 78 if (isMasterNode) { 79 spawnNewClient(); 81 } 82 barrier.barrier(); 83 System.err.println("### stage 4 [new client disconnected]: thisNode=" + thisNode + ", threadId=" 84 + Thread.currentThread().getName()); 85 86 waitForNodes(initialNodeCount); 87 check(1, nodeConCnt.get(), "nodeConnected"); 88 check(1, nodeDisCnt.get(), "nodeDisconnected"); 89 clearCounters(); 90 barrier.barrier(); 91 System.err.println("### stage 5 [all done]: thisNode=" + thisNode + ", threadId=" 92 + Thread.currentThread().getName()); 93 } 94 95 private void clearCounters() { 96 this.nodeConCnt.set(0); 97 this.nodeDisCnt.set(0); 98 this.thisNodeConCnt.set(0); 99 this.thisNodeDisCnt.set(0); 100 } 101 102 private void waitForNodes(int expectedSize) { 103 while (true) { 104 synchronized (nodes) { 105 if (nodes.size() == expectedSize) break; 106 try { 107 nodes.wait(); 108 } catch (InterruptedException e) { 109 notifyError(e); 110 } 111 } 112 } 113 } 114 115 private void check(int expectedMin, int actual, String msg) { 116 if (actual < expectedMin) notifyError(msg + " expectedMin=" + expectedMin + ", actual=" + actual + ", thisNodeId=" 119 + thisNode); 120 } 121 122 public void nodeConnected(String nodeId) { 123 new Throwable ("### TRACE: ClusterMembershipEventTestApp.nodeConnected()").printStackTrace(); 124 nodeConCnt.increment(); 125 System.err.println("\n### nodeConnected: thisNode=" + thisNode + ", nodeId=" + nodeId + ", threadId=" 126 + Thread.currentThread().getName() + ", cnt=" + nodeConCnt.get()); 127 synchronized (nodes) { 128 nodes.add(nodeId); 129 nodes.notifyAll(); 130 } 131 } 132 133 public void nodeDisconnected(String nodeId) { 134 new Throwable ("### TRACE: ClusterMembershipEventTestApp.nodeDisconnected()").printStackTrace(); 135 nodeDisCnt.increment(); 136 System.err.println("\n### nodeDisconnected: thisNode=" + thisNode + ", nodeId=" + nodeId + ", threadId=" 137 + Thread.currentThread().getName() + ", cnt=" + nodeDisCnt.get()); 138 synchronized (nodes) { 139 nodes.remove(nodeId); 140 nodes.notifyAll(); 141 } 142 } 143 144 public void thisNodeConnected(String thisNodeId, String [] nodesCurrentlyInCluster) { 145 new Throwable ("### TRACE: ClusterMembershipEventTestApp.thisNodeConnected()").printStackTrace(); 146 thisNodeConCnt.increment(); 147 thisNode = thisNodeId; 148 System.err.println("\n### thisNodeConnected->thisNodeId=" + thisNodeId + ", threadId=" 149 + Thread.currentThread().getName() + ", cnt=" + thisNodeConCnt.get()); 150 synchronized (nodes) { 151 nodes.add(thisNode); 152 for (int i = 0; i < nodesCurrentlyInCluster.length; i++) { 153 nodes.add(nodesCurrentlyInCluster[i]); 154 } 155 nodes.notifyAll(); 156 } 157 } 158 159 public void thisNodeDisconnected(String thisNodeId) { 160 new Throwable ("### TRACE: ClusterMembershipEventTestApp.thisNodeDisconnected()").printStackTrace(); 161 thisNodeDisCnt.increment(); 162 System.err.println("\n### thisNodeDisconnected->thisNodeId=" + thisNodeId + ", threadId=" 163 + Thread.currentThread().getName() + ", cnt=" + thisNodeDisCnt.get()); 164 synchronized (nodes) { 165 nodes.clear(); 166 nodes.notifyAll(); 167 } 168 } 169 170 public static class L1Client { 171 public static void main(String args[]) { 172 } 174 } 175 176 private ExtraL1ProcessControl spawnNewClient() throws Exception { 177 final String hostName = getHostName(); 178 final int port = getPort(); 179 final File configFile = new File (getConfigFilePath()); 180 File workingDir = new File (configFile.getParentFile(), "client-0"); 181 FileUtils.forceMkdir(workingDir); 182 183 ExtraL1ProcessControl client = new ExtraL1ProcessControl(hostName, port, L1Client.class, configFile 184 .getAbsolutePath(), new String [0], workingDir); 185 client.start(20000); 186 client.mergeSTDERR(); 187 client.mergeSTDOUT(); 188 client.waitFor(); 189 System.err.println("\n### Started New Client"); 190 return client; 191 } 192 193 } 194
| Popular Tags
|