KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > MergeStressTest


1 // $Id: MergeStressTest.java,v 1.6 2007/03/12 11:03:06 belaban Exp $
2

3 package org.jgroups.tests;
4
5
6 import junit.framework.Test;
7 import junit.framework.TestCase;
8 import junit.framework.TestSuite;
9 import org.jgroups.*;
10 import org.jgroups.util.Util;
11
12 import java.util.concurrent.CyclicBarrier JavaDoc;
13
14
15 /**
16  * Creates NUM channels, all trying to join the same channel concurrently. This will lead to singleton groups
17  * and subsequent merging. To enable merging, GMS.handle_concurrent_startup has to be set to false.
18  * @author Bela Ban
19  * @version $Id: MergeStressTest.java,v 1.6 2007/03/12 11:03:06 belaban Exp $
20  */

21 public class MergeStressTest extends TestCase {
22     static CyclicBarrier JavaDoc start_connecting=null;
23     static CyclicBarrier JavaDoc received_all_views=null;
24     static CyclicBarrier JavaDoc start_disconnecting=null;
25     static CyclicBarrier JavaDoc disconnected=null;
26     static final int NUM=10;
27     static final long TIMEOUT=50000;
28     static final MyThread[] threads=new MyThread[NUM];
29     static String JavaDoc groupname="ConcurrentTestDemo";
30
31
32     static String JavaDoc props="UDP(mcast_addr=228.8.8.9;mcast_port=7788;ip_ttl=1;" +
33             "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
34             "PING(timeout=3000;num_initial_members=3):" +
35             "MERGE2(min_interval=3000;max_interval=5000):" +
36             "FD_SOCK:" +
37             "VERIFY_SUSPECT(timeout=1500):" +
38             "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):" +
39             "UNICAST(timeout=300,600,1200,2400):" +
40             "pbcast.STABLE(desired_avg_gossip=5000):" +
41             "FRAG(frag_size=4096):" +
42             "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
43             "shun=false;print_local_addr=false;view_ack_collection_timeout=5000;" +
44             "merge_timeout=30000;handle_concurrent_startup=false)";
45
46
47
48     public MergeStressTest(String JavaDoc name) {
49         super(name);
50     }
51
52
53     static void log(String JavaDoc msg) {
54         System.out.println("-- [" + Thread.currentThread().getName() + "] " + msg);
55     }
56
57
58     public void testConcurrentStartupAndMerging() throws Exception JavaDoc {
59         start_connecting=new CyclicBarrier JavaDoc(NUM+1);
60         received_all_views=new CyclicBarrier JavaDoc(NUM+1);
61         start_disconnecting=new CyclicBarrier JavaDoc(NUM+1);
62         disconnected=new CyclicBarrier JavaDoc(NUM+1);
63
64         long start, stop;
65
66         for(int i=0; i < threads.length; i++) {
67             threads[i]=new MyThread(i);
68             threads[i].start();
69         }
70
71         // signal the threads to start connecting to their channels
72
Util.sleep(1000);
73         start_connecting.await();
74         start=System.currentTimeMillis();
75
76         try {
77             received_all_views.await();
78             stop=System.currentTimeMillis();
79             System.out.println("-- took " + (stop-start) + " msecs for all " + NUM + " threads to see all views");
80
81             int num_members;
82             MyThread t;
83             System.out.print("checking that all views have " + NUM + " members: ");
84             for(int i=0; i < threads.length; i++) {
85                 t=threads[i];
86                 num_members=t.numMembers();
87                 assertEquals(num_members, NUM);
88             }
89             System.out.println("SUCCESSFUL");
90         }
91         catch(Exception JavaDoc ex) {
92             fail(ex.toString());
93         }
94         finally {
95             start_disconnecting.await();
96             disconnected.await();
97         }
98     }
99
100
101
102
103
104     public static class MyThread extends ReceiverAdapter implements Runnable JavaDoc {
105         int index=-1;
106         long total_connect_time=0, total_disconnect_time=0;
107         private JChannel ch=null;
108         private Address my_addr=null;
109         private View current_view;
110         private Thread JavaDoc thread;
111         private int num_members=0;
112
113
114
115         public MyThread(int i) {
116             thread=new Thread JavaDoc(this, "thread #" + i);
117             index=i;
118         }
119
120         public void start() {
121             thread.start();
122         }
123
124         public void closeChannel() {
125             if(ch != null) {
126                 ch.close();
127             }
128         }
129
130         public int numMembers() {
131             return ch.getView().size();
132         }
133
134
135         public void viewAccepted(View new_view) {
136             String JavaDoc type="view";
137             if(new_view instanceof MergeView)
138                 type="merge view";
139             if(current_view == null) {
140                 current_view=new_view;
141                 log(type + " accepted: " + current_view.getVid() + " :: " + current_view.getMembers());
142             }
143             else {
144                 if(!current_view.equals(new_view)) {
145                     current_view=new_view;
146                     log(type + " accepted: " + current_view.getVid() + " :: " + current_view.getMembers());
147                 }
148             }
149
150             num_members=current_view.getMembers().size();
151             if(num_members == NUM) {
152                 synchronized(this) {
153                     this.notifyAll();
154                 }
155             }
156         }
157
158
159         public void run() {
160             View view;
161
162             try {
163                 start_connecting.await();
164                 ch=new JChannel(props);
165                 ch.setReceiver(this);
166                 log("connecting to channel");
167                 long start=System.currentTimeMillis(), stop;
168                 ch.connect(groupname);
169                 stop=System.currentTimeMillis();
170                 total_connect_time=stop-start;
171                 view=ch.getView();
172                 my_addr=ch.getLocalAddress();
173                 log(my_addr + " connected in " + total_connect_time + " msecs (" +
174                     view.getMembers().size() + " members). VID=" + ch.getView());
175
176                 synchronized(this) {
177                     while(num_members < NUM) {
178                         try {this.wait();} catch(InterruptedException JavaDoc e) {}
179                     }
180                 }
181
182                 log("reached " + num_members + " members");
183                 received_all_views.await();
184
185                 start_disconnecting.await();
186                 start=System.currentTimeMillis();
187                 ch.shutdown();
188                 stop=System.currentTimeMillis();
189
190                 log(my_addr + " shut down in " + (stop-start) + " msecs");
191                 disconnected.await();
192             }
193             catch(Exception JavaDoc e) {
194                 e.printStackTrace();
195             }
196         }
197
198
199     }
200
201
202     public static Test suite() {
203         return new TestSuite(MergeStressTest.class);
204     }
205
206     public static void main(String JavaDoc[] args) {
207         String JavaDoc[] testCaseName={MergeStressTest.class.getName()};
208         junit.textui.TestRunner.main(testCaseName);
209     }
210
211
212 }
213
Popular Tags