1 7 package org.jboss.dtf; 8 9 import junit.framework.TestCase; 10 import org.jgroups.Address; 11 import org.jgroups.Channel; 12 import org.jgroups.ChannelException; 13 import org.jgroups.JChannel; 14 import org.jgroups.MembershipListener; 15 import org.jgroups.View; 16 import org.jgroups.blocks.GroupRequest; 17 import org.jgroups.blocks.MethodCall; 18 import org.jgroups.blocks.RpcDispatcher; 19 20 import java.util.Vector ; 21 22 32 public class DistributedTest 33 extends TestCase 34 implements MembershipListener 35 { 36 private int parties = 2; 38 private Channel channel; 39 private RpcDispatcher disp; 40 private Address localAddress; 41 private String props = "UDP(mcast_recv_buf_size=64000;mcast_send_buf_size=32000;" + 42 "mcast_port=45566;use_packet_handler=false;ucast_recv_buf_size=64000;" + 43 "mcast_addr=228.8.8.8;loopback=false;ucast_send_buf_size=32000;ip_ttl=32):" + 44 "PING(timeout=2000;num_initial_members=3):" + 45 "MERGE2(max_interval=10000;min_interval=5000):" + 46 "FD(timeout=2000;max_tries=3;shun=true):" + 47 "VERIFY_SUSPECT(timeout=1500):" + 48 "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,1200,2400,4800):" + 49 "UNICAST(timeout=1200,2400,3600):" + 50 "pbcast.STABLE(desired_avg_gossip=20000):" + 51 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" + 52 "pbcast.GMS(print_local_addr=true;join_timeout=3000;join_retry_timeout=2000;shun=true)"; 53 54 55 private long startupTimeout = 20000; 57 private long shutdownTimeout = 120000; 60 61 private int shutdownCount; 62 63 private boolean startupWaitFlag = false; 64 private boolean shutdownWaitFlag = false; 65 66 private final Object waitObj = new Object (); 67 68 public DistributedTest(String name) 69 { 70 super(name); 71 parties = Integer.getInteger("jboss.test.distributed.instancecount", 2).intValue(); 72 } 73 74 public int getNumberOfInstances() 75 { 76 return parties; 77 } 78 79 public long getShutdownTimeout() 80 { 81 return shutdownTimeout; 82 } 83 84 protected void setShutdownTimeout(long timeout) 85 { 86 this.shutdownTimeout = timeout; 87 } 88 89 protected void settUp() throws Exception 90 { 91 shutdownCount = parties; 92 startupWaitFlag = true; 93 sendStartupNotification(); 94 95 long startTime = System.currentTimeMillis(); 96 while(startupWaitFlag) 97 { 98 try 99 { 100 synchronized(waitObj) 101 { 102 waitObj.wait(1000); 103 } 104 105 if(timeoutExpired(startTime, startupTimeout)) 106 { 107 break; 108 } 109 } 110 catch(InterruptedException e) 111 { 112 break; 113 } 114 } 115 116 if(startupWaitFlag) 117 { 118 disp.stop(); 120 channel.disconnect(); 121 throw new Exception ("Timed out waiting for other instances to start."); 122 } 123 } 124 125 protected void shutDown() throws Exception 126 { 127 shutdownWaitFlag = true; 128 sendShutdownNotification(); 129 130 long startTime = System.currentTimeMillis(); 131 while(shutdownWaitFlag) 132 { 133 try 134 { 135 Thread.sleep(1000); 137 if(timeoutExpired(startTime, shutdownTimeout)) 138 { 139 break; 140 } 141 } 142 catch(InterruptedException e) 143 { 144 } 145 } 146 147 if(shutdownWaitFlag) 148 { 149 throw new Exception ("Timed out waiting for other instances to stop."); 151 } 152 disp.stop(); 153 channel.disconnect(); 154 } 155 156 private boolean timeoutExpired(long startTime, long timeout) 157 { 158 long duration = System.currentTimeMillis() - startTime; 159 if(duration > timeout) 160 { 161 return true; 162 } 163 else 164 { 165 return false; 166 } 167 } 168 169 private void sendStartupNotification() throws ChannelException 170 { 171 channel = new JChannel(props); 173 disp = new RpcDispatcher(channel, null, this, this); 174 channel.connect("DistributedTestCase"); 175 localAddress = channel.getLocalAddress(); 176 } 177 178 private void sendShutdownNotification() 179 { 180 MethodCall call = new MethodCall("receiveShutdownNotification", 181 new Object []{localAddress}, new Class []{Address.class}); 182 disp.callRemoteMethods(null, call, GroupRequest.GET_NONE, 0); 183 } 184 185 public void viewAccepted(View view) 186 { 187 Vector members = view.getMembers(); 189 int numOfMembers = members.size(); 190 if(numOfMembers >= parties && startupWaitFlag) { 192 startupWaitFlag = false; 193 synchronized(waitObj) 194 { 195 waitObj.notify(); 196 } 197 } 198 } 199 200 public void receiveShutdownNotification(Address address) 201 { 202 if(--shutdownCount == 0 && shutdownWaitFlag) { 204 shutdownWaitFlag = false; 205 } 206 } 207 208 public void suspect(Address address) 209 { 210 } 211 212 public void block() 213 { 214 } 215 } 216 | Popular Tags |