KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > dtf > DistributedTest


1 /*
2 * JBoss, the OpenSource J2EE webOS
3 *
4 * Distributable under LGPL license.
5 * See terms of license at gnu.org.
6 */

7 package org.jboss.dtf;
8
9 import junit.framework.TestCase;
10 import org.jgroups.Address;
11 import org.jgroups.Channel;
12 import org.jgroups.ChannelException;
13 import org.jgroups.JChannel;
14 import org.jgroups.MembershipListener;
15 import org.jgroups.View;
16 import org.jgroups.blocks.GroupRequest;
17 import org.jgroups.blocks.MethodCall;
18 import org.jgroups.blocks.RpcDispatcher;
19
20 import java.util.Vector JavaDoc;
21
22 /**
23  * This class should be used as the base class for any distributed test classes
24  * or other classes where you need to have a remote barrier so all of the
25  * classes can start at the same time.
26  * <br>
27  * This class requires JGroups 2.0 to run
28  *
29  * @author <a HREF="mailto:telrod@vocalocity.net">Tom Elrod</a>
30  * @version $Revision: 1.2.2.3 $
31  */

32 public class DistributedTest
33       extends TestCase
34       implements MembershipListener
35 {
36    private int parties = 2; //defaults to 2 since most common number of instances (since will always be > 1)
37

38    private Channel channel;
39    private RpcDispatcher disp;
40    private Address localAddress;
41    private String JavaDoc props = "UDP(mcast_recv_buf_size=64000;mcast_send_buf_size=32000;" +
42                           "mcast_port=45566;use_packet_handler=false;ucast_recv_buf_size=64000;" +
43                           "mcast_addr=228.8.8.8;loopback=false;ucast_send_buf_size=32000;ip_ttl=32):" +
44                           "PING(timeout=2000;num_initial_members=3):" +
45                           "MERGE2(max_interval=10000;min_interval=5000):" +
46                           "FD(timeout=2000;max_tries=3;shun=true):" +
47                           "VERIFY_SUSPECT(timeout=1500):" +
48                           "pbcast.NAKACK(max_xmit_size=8192;gc_lag=50;retransmit_timeout=600,1200,2400,4800):" +
49                           "UNICAST(timeout=1200,2400,3600):" +
50                           "pbcast.STABLE(desired_avg_gossip=20000):" +
51                           "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
52                           "pbcast.GMS(print_local_addr=true;join_timeout=3000;join_retry_timeout=2000;shun=true)";
53
54
55    // How long should wait for everyone to be ready to start. Default 10 seconds
56
private long startupTimeout = 20000;
57    // How long should wait for everyone to be ready to shutdown.
58
// Default 2 minutes since could take a long time to run tests.
59
private long shutdownTimeout = 120000;
60
61    private int shutdownCount;
62
63    private boolean startupWaitFlag = false;
64    private boolean shutdownWaitFlag = false;
65
66    private final Object JavaDoc waitObj = new Object JavaDoc();
67
68    public DistributedTest(String JavaDoc name)
69    {
70       super(name);
71       parties = Integer.getInteger("jboss.test.distributed.instancecount", 2).intValue();
72    }
73
74    public int getNumberOfInstances()
75    {
76       return parties;
77    }
78
79    public long getShutdownTimeout()
80    {
81       return shutdownTimeout;
82    }
83
84    protected void setShutdownTimeout(long timeout)
85    {
86       this.shutdownTimeout = timeout;
87    }
88
89    protected void settUp() throws Exception JavaDoc
90    {
91       shutdownCount = parties;
92       startupWaitFlag = true;
93       sendStartupNotification();
94
95       long startTime = System.currentTimeMillis();
96       while(startupWaitFlag)
97       {
98          try
99          {
100             synchronized(waitObj)
101             {
102                waitObj.wait(1000);
103             }
104
105             if(timeoutExpired(startTime, startupTimeout))
106             {
107                break;
108             }
109          }
110          catch(InterruptedException JavaDoc e)
111          {
112             break;
113          }
114       }
115
116       if(startupWaitFlag)
117       {
118          // we timed out and still not everyone joined
119
disp.stop();
120          channel.disconnect();
121          throw new Exception JavaDoc("Timed out waiting for other instances to start.");
122       }
123    }
124
125    protected void shutDown() throws Exception JavaDoc
126    {
127       shutdownWaitFlag = true;
128       sendShutdownNotification();
129
130       long startTime = System.currentTimeMillis();
131       while(shutdownWaitFlag)
132       {
133          try
134          {
135             //TODO: Need to same waitObj.wait(1000) as is done in startup()
136
Thread.sleep(1000);
137             if(timeoutExpired(startTime, shutdownTimeout))
138             {
139                break;
140             }
141          }
142          catch(InterruptedException JavaDoc e)
143          {
144          }
145       }
146
147       if(shutdownWaitFlag)
148       {
149          // we timed out
150
throw new Exception JavaDoc("Timed out waiting for other instances to stop.");
151       }
152       disp.stop();
153       channel.disconnect();
154    }
155
156    private boolean timeoutExpired(long startTime, long timeout)
157    {
158       long duration = System.currentTimeMillis() - startTime;
159       if(duration > timeout)
160       {
161          return true;
162       }
163       else
164       {
165          return false;
166       }
167    }
168
169    private void sendStartupNotification() throws ChannelException
170    {
171       //JGroups code
172
channel = new JChannel(props);
173       disp = new RpcDispatcher(channel, null, this, this);
174       channel.connect("DistributedTestCase");
175       localAddress = channel.getLocalAddress();
176    }
177
178    private void sendShutdownNotification()
179    {
180       MethodCall call = new MethodCall("receiveShutdownNotification",
181                                        new Object JavaDoc[]{localAddress}, new Class JavaDoc[]{Address.class});
182       disp.callRemoteMethods(null, call, GroupRequest.GET_NONE, 0);
183    }
184
185    public void viewAccepted(View view)
186    {
187       // has everyone joined
188
Vector JavaDoc members = view.getMembers();
189       int numOfMembers = members.size();
190       if(numOfMembers >= parties && startupWaitFlag) // waiting for everyone to start
191
{
192          startupWaitFlag = false;
193          synchronized(waitObj)
194          {
195             waitObj.notify();
196          }
197       }
198    }
199
200    public void receiveShutdownNotification(Address address)
201    {
202       if(--shutdownCount == 0 && shutdownWaitFlag) // waiting for everyone to stop
203
{
204          shutdownWaitFlag = false;
205       }
206    }
207
208    public void suspect(Address address)
209    {
210    }
211
212    public void block()
213    {
214    }
215 }
216
Popular Tags