1 3 package org.jgroups.tests; 4 5 6 import junit.framework.Test; 7 import junit.framework.TestCase; 8 import junit.framework.TestSuite; 9 import org.jgroups.*; 10 import org.jgroups.util.Util; 11 12 import java.util.concurrent.CyclicBarrier ; 13 14 15 21 public class MergeStressTest extends TestCase { 22 static CyclicBarrier start_connecting=null; 23 static CyclicBarrier received_all_views=null; 24 static CyclicBarrier start_disconnecting=null; 25 static CyclicBarrier disconnected=null; 26 static final int NUM=10; 27 static final long TIMEOUT=50000; 28 static final MyThread[] threads=new MyThread[NUM]; 29 static String groupname="ConcurrentTestDemo"; 30 31 32 static String props="UDP(mcast_addr=228.8.8.9;mcast_port=7788;ip_ttl=1;" + 33 "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + 34 "PING(timeout=3000;num_initial_members=3):" + 35 "MERGE2(min_interval=3000;max_interval=5000):" + 36 "FD_SOCK:" + 37 "VERIFY_SUSPECT(timeout=1500):" + 38 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):" + 39 "UNICAST(timeout=300,600,1200,2400):" + 40 "pbcast.STABLE(desired_avg_gossip=5000):" + 41 "FRAG(frag_size=4096):" + 42 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 43 "shun=false;print_local_addr=false;view_ack_collection_timeout=5000;" + 44 "merge_timeout=30000;handle_concurrent_startup=false)"; 45 46 47 48 public MergeStressTest(String name) { 49 super(name); 50 } 51 52 53 static void log(String msg) { 54 System.out.println("-- [" + Thread.currentThread().getName() + "] " + msg); 55 } 56 57 58 public void testConcurrentStartupAndMerging() throws Exception { 59 start_connecting=new CyclicBarrier (NUM+1); 60 received_all_views=new CyclicBarrier (NUM+1); 61 start_disconnecting=new CyclicBarrier (NUM+1); 62 disconnected=new CyclicBarrier (NUM+1); 63 64 long start, stop; 65 66 for(int i=0; i < threads.length; i++) { 67 threads[i]=new MyThread(i); 68 threads[i].start(); 69 } 70 71 Util.sleep(1000); 73 start_connecting.await(); 74 start=System.currentTimeMillis(); 75 76 try { 77 received_all_views.await(); 78 stop=System.currentTimeMillis(); 79 System.out.println("-- took " + (stop-start) + " msecs for all " + NUM + " threads to see all views"); 80 81 int num_members; 82 MyThread t; 83 System.out.print("checking that all views have " + NUM + " members: "); 84 for(int i=0; i < threads.length; i++) { 85 t=threads[i]; 86 num_members=t.numMembers(); 87 assertEquals(num_members, NUM); 88 } 89 System.out.println("SUCCESSFUL"); 90 } 91 catch(Exception ex) { 92 fail(ex.toString()); 93 } 94 finally { 95 start_disconnecting.await(); 96 disconnected.await(); 97 } 98 } 99 100 101 102 103 104 public static class MyThread extends ReceiverAdapter implements Runnable { 105 int index=-1; 106 long total_connect_time=0, total_disconnect_time=0; 107 private JChannel ch=null; 108 private Address my_addr=null; 109 private View current_view; 110 private Thread thread; 111 private int num_members=0; 112 113 114 115 public MyThread(int i) { 116 thread=new Thread (this, "thread #" + i); 117 index=i; 118 } 119 120 public void start() { 121 thread.start(); 122 } 123 124 public void closeChannel() { 125 if(ch != null) { 126 ch.close(); 127 } 128 } 129 130 public int numMembers() { 131 return ch.getView().size(); 132 } 133 134 135 public void viewAccepted(View new_view) { 136 String type="view"; 137 if(new_view instanceof MergeView) 138 type="merge view"; 139 if(current_view == null) { 140 current_view=new_view; 141 log(type + " accepted: " + current_view.getVid() + " :: " + current_view.getMembers()); 142 } 143 else { 144 if(!current_view.equals(new_view)) { 145 current_view=new_view; 146 log(type + " accepted: " + current_view.getVid() + " :: " + current_view.getMembers()); 147 } 148 } 149 150 num_members=current_view.getMembers().size(); 151 if(num_members == NUM) { 152 synchronized(this) { 153 this.notifyAll(); 154 } 155 } 156 } 157 158 159 public void run() { 160 View view; 161 162 try { 163 start_connecting.await(); 164 ch=new JChannel(props); 165 ch.setReceiver(this); 166 log("connecting to channel"); 167 long start=System.currentTimeMillis(), stop; 168 ch.connect(groupname); 169 stop=System.currentTimeMillis(); 170 total_connect_time=stop-start; 171 view=ch.getView(); 172 my_addr=ch.getLocalAddress(); 173 log(my_addr + " connected in " + total_connect_time + " msecs (" + 174 view.getMembers().size() + " members). VID=" + ch.getView()); 175 176 synchronized(this) { 177 while(num_members < NUM) { 178 try {this.wait();} catch(InterruptedException e) {} 179 } 180 } 181 182 log("reached " + num_members + " members"); 183 received_all_views.await(); 184 185 start_disconnecting.await(); 186 start=System.currentTimeMillis(); 187 ch.shutdown(); 188 stop=System.currentTimeMillis(); 189 190 log(my_addr + " shut down in " + (stop-start) + " msecs"); 191 disconnected.await(); 192 } 193 catch(Exception e) { 194 e.printStackTrace(); 195 } 196 } 197 198 199 } 200 201 202 public static Test suite() { 203 return new TestSuite(MergeStressTest.class); 204 } 205 206 public static void main(String [] args) { 207 String [] testCaseName={MergeStressTest.class.getName()}; 208 junit.textui.TestRunner.main(testCaseName); 209 } 210 211 212 } 213 | Popular Tags |