1 3 package org.jgroups.tests; 4 5 6 import junit.framework.TestCase; 7 import junit.framework.Test; 8 import junit.framework.TestSuite; 9 import org.jgroups.*; 10 import org.jgroups.util.Util; 11 12 import java.util.Vector ; 13 import java.util.concurrent.CyclicBarrier ; 14 import java.util.concurrent.BrokenBarrierException ; 15 16 17 22 public class ConnectStressTest extends TestCase { 23 static CyclicBarrier start_connecting=null; 24 static CyclicBarrier connected=null; 25 static CyclicBarrier received_all_views=null; 26 static CyclicBarrier start_disconnecting=null; 27 static CyclicBarrier disconnected=null; 28 static final int NUM=30; 29 static final MyThread[] threads=new MyThread[NUM]; 30 static JChannel channel=null; 31 static String groupname="ConcurrentTestDemo"; 32 33 34 static String props="udp.xml"; 35 36 37 38 public ConnectStressTest(String name) { 39 super(name); 40 41 } 42 43 44 static void log(String msg) { 45 System.out.println("-- [" + Thread.currentThread().getName() + "] " + msg); 46 } 47 48 49 public void testConcurrentJoins() throws Exception { 50 start_connecting=new CyclicBarrier (NUM +1); 51 connected=new CyclicBarrier (NUM +1); 52 received_all_views=new CyclicBarrier (NUM +1); 53 start_disconnecting=new CyclicBarrier (NUM +1); 54 disconnected=new CyclicBarrier (NUM +1); 55 56 long start, stop; 57 58 channel=new JChannel(props); 60 channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE); 61 start=System.currentTimeMillis(); 62 channel.connect(groupname); 63 stop=System.currentTimeMillis(); 64 log(channel.getLocalAddress() + " connected in " + (stop-start) + " msecs (" + 65 channel.getView().getMembers().size() + " members). VID=" + channel.getView().getVid()); 66 assertEquals(1, channel.getView().getMembers().size()); 67 68 for(int i=0; i < threads.length; i++) { 69 threads[i]=new MyThread(i); 70 threads[i].start(); 71 } 72 73 start_connecting.await(); 75 start=System.currentTimeMillis(); 76 77 try { 78 connected.await(); 79 stop=System.currentTimeMillis(); 80 System.out.println("-- took " + (stop-start) + " msecs for all " + NUM + " threads to connect"); 81 82 received_all_views.await(); 83 stop=System.currentTimeMillis(); 84 System.out.println("-- took " + (stop-start) + " msecs for all " + NUM + " threads to see all views"); 85 86 int num_members=-1; 87 for(int i=0; i < 10; i++) { 88 View v=channel.getView(); 89 num_members=v.getMembers().size(); 90 System.out.println("*--* number of members connected: " + num_members + ", (expected: " +(NUM+1) + 91 "), v=" + v); 92 if(num_members == NUM+1) 93 break; 94 Util.sleep(500); 95 } 96 assertEquals((NUM+1), num_members); 97 } 98 catch(Exception ex) { 99 fail(ex.toString()); 100 } 101 } 102 103 public void testConcurrentLeaves() throws Exception { 104 start_disconnecting.await(); 105 long start, stop; 106 start=System.currentTimeMillis(); 107 108 disconnected.await(); 109 stop=System.currentTimeMillis(); 110 System.out.println("-- took " + (stop-start) + " msecs for " + NUM + " threads to disconnect"); 111 112 int num_members=0; 113 for(int i=0; i < 10; i++) { 114 View v=channel.getView(); 115 Vector mbrs=v != null? v.getMembers() : null; 116 if(mbrs != null) { 117 num_members=mbrs.size(); 118 System.out.println("*--* number of members connected: " + num_members + ", (expected: 1), view=" + v); 119 if(num_members <= 1) 120 break; 121 } 122 Util.sleep(3000); 123 } 124 assertEquals(1, num_members); 125 log("closing all channels"); 126 for(int i=0; i < threads.length; i++) { 127 MyThread t=threads[i]; 128 t.closeChannel(); 129 } 130 channel.close(); 131 } 132 133 134 135 136 public static class MyThread extends Thread { 137 int index=-1; 138 long total_connect_time=0, total_disconnect_time=0; 139 private JChannel ch=null; 140 private Address my_addr=null; 141 142 public MyThread(int i) { 143 super("thread #" + i); 144 index=i; 145 } 146 147 public void closeChannel() { 148 if(ch != null) { 149 ch.close(); 150 } 151 } 152 153 154 public void run() { 155 View view; 156 157 try { 158 ch=new JChannel(props); 159 160 start_connecting.await(); 161 162 long start=System.currentTimeMillis(), stop; 163 ch.connect(groupname); 164 stop=System.currentTimeMillis(); 165 total_connect_time=stop-start; 166 view=ch.getView(); 167 my_addr=ch.getLocalAddress(); 168 log(my_addr + " connected in " + total_connect_time + " msecs (" + 169 view.getMembers().size() + " members). VID=" + view.getVid()); 170 171 connected.await(); 172 173 int num_members=0; 174 while(true) { 175 View v=ch.getView(); 176 Vector mbrs=v != null? v.getMembers() : null; 177 if(mbrs == null) { 178 System.err.println("mbrs is null, v=" + v); 179 } 180 else { 181 num_members=mbrs.size(); 182 log("num_members=" + num_members); 183 if(num_members == NUM+1) break; 185 } 186 Util.sleep(2000); 187 } 188 log("reached " + num_members + " members"); 189 received_all_views.await(); 190 191 start_disconnecting.await(); 192 start=System.currentTimeMillis(); 193 ch.disconnect(); 194 stop=System.currentTimeMillis(); 195 196 log(my_addr + " disconnected in " + (stop-start) + " msecs"); 197 disconnected.await(); 198 } 199 catch(BrokenBarrierException e) { 200 e.printStackTrace(); 201 } 202 catch(ChannelException e) { 203 e.printStackTrace(); 204 } 205 catch(InterruptedException e) { 206 e.printStackTrace(); 207 } 208 } 209 210 211 } 212 213 214 public static Test suite() { 215 TestSuite s=new TestSuite(); 216 s.addTest(new ConnectStressTest("testConcurrentJoins")); 218 s.addTest(new ConnectStressTest("testConcurrentLeaves")); 219 return s; 220 } 221 222 public static void main(String [] args) { 223 String [] testCaseName={ConnectStressTest.class.getName()}; 224 junit.textui.TestRunner.main(testCaseName); 225 } 226 227 228 } 229 | Popular Tags |