KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > RpcDispatcherStressTest


1 // $Id: RpcDispatcherStressTest.java,v 1.6 2004/07/05 14:15:11 belaban Exp $
2

3
4 package org.jgroups.tests;
5
6
7 import org.jgroups.*;
8 import org.jgroups.blocks.GroupRequest;
9 import org.jgroups.blocks.RpcDispatcher;
10 import org.jgroups.util.RspList;
11 import org.jgroups.util.Util;
12
13
14
15 /**
16  * Example for RpcDispatcher (see also MessageDispatcher). Multiple threads will invoke print() on
17  * all members and wait indefinitely for all responses (excluding of course crashed members). Run this
18  * on 2 nodes for an extended period of time to see whether GroupRequest.doExecute() hangs.
19  * @author Bela Ban
20  */

21 public class RpcDispatcherStressTest implements MembershipListener {
22     Channel channel;
23     RpcDispatcher disp;
24     RspList rsp_list;
25     Publisher[] threads=null;
26     int[] results;
27
28     public int print(int number) throws Exception JavaDoc {
29         return number * 2;
30     }
31
32
33     public void start(String JavaDoc props, int num_threads, long interval, boolean discard_local) throws Exception JavaDoc {
34         channel=new JChannel(props);
35         if(discard_local)
36             channel.setOpt(Channel.LOCAL, Boolean.FALSE);
37         disp=new RpcDispatcher(channel, null, this, this);
38         channel.connect("RpcDispatcherStressTestGroup");
39
40         threads=new Publisher[num_threads];
41         results=new int[num_threads];
42         for(int i=0; i < threads.length; i++) {
43             threads[i]=new Publisher(i, interval);
44             results[i]=0;
45         }
46
47         System.out.println("-- Created " + threads.length + " threads. Press enter to start them " +
48                            "('-' for sent message, '+' for received message)");
49         System.out.println("-- Press enter to stop the threads");
50         
51         System.out.flush();
52         System.in.read();
53         System.in.skip(System.in.available());
54
55         for(int i=0; i < threads.length; i++)
56             threads[i].start();
57
58         System.out.flush();
59         System.in.read();
60         System.in.skip(System.in.available());
61
62         for(int i=0; i < threads.length; i++) {
63             threads[i].stopThread();
64             threads[i].join(2000);
65         }
66
67         System.out.println("\n");
68         for(int i=0; i < threads.length; i++) {
69             System.out.println("-- thread #" + i + ": called remote method " + results[i] + " times");
70         }
71
72
73         System.out.println("Closing channel");
74         channel.close();
75         System.out.println("Closing channel: -- done");
76
77         System.out.println("Stopping dispatcher");
78         disp.stop();
79         System.out.println("Stopping dispatcher: -- done");
80     }
81
82
83     /* --------------------------------- MembershipListener interface ---------------------------------- */
84
85     public void viewAccepted(View new_view) {
86         System.out.println("-- new view: " + new_view);
87     }
88
89     public void suspect(Address suspected_mbr) {
90         System.out.println("-- suspected " + suspected_mbr);
91     }
92
93
94
95     public void block() {
96         ;
97     }
98
99     /* ------------------------------ End of MembershipListener interface -------------------------------- */
100
101
102
103     class Publisher extends Thread JavaDoc {
104         int rank=0;
105         boolean running=true;
106         int num_calls=0;
107         long interval=1000;
108
109         Publisher(int rank, long interval) {
110             super();
111             setDaemon(true);
112             this.rank=rank;
113             this.interval=interval;
114         }
115
116         public void stopThread() {
117             running=false;
118         }
119
120         public void run() {
121             while(running) {
122                 System.out.print(rank + "- ");
123                 disp.callRemoteMethods(null, "print", new Object JavaDoc[]{new Integer JavaDoc(num_calls)},
124                         new Class JavaDoc[]{int.class}, GroupRequest.GET_ALL, 0);
125                 num_calls++;
126                 System.out.print(rank + "+ ");
127                 Util.sleep(interval);
128             }
129             results[rank]=num_calls;
130         }
131     }
132
133
134
135
136     public static void main(String JavaDoc[] args) {
137         String JavaDoc props;
138         int num_threads=1;
139         long interval=1000;
140         boolean discard_local=false;
141
142         props="UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" +
143                 "ucast_recv_buf_size=16000;ucast_send_buf_size=16000;" +
144                 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):"+
145                 "PING(timeout=2000;num_initial_members=3):"+
146                 "MERGE2(min_interval=5000;max_interval=10000):"+
147                 "FD_SOCK:"+
148                 "VERIFY_SUSPECT(timeout=1500):"+
149                 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=1000,1500,2000,3000;max_xmit_size=8192):"+
150                 "UNICAST(timeout=1000,1500,2000,3000):"+
151                 "pbcast.STABLE(desired_avg_gossip=10000):"+
152                 "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
153                 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true):"+
154                 "pbcast.STATE_TRANSFER";
155         
156         try {
157             for(int i=0; i < args.length; i++) {
158                 if("-num_threads".equals(args[i])) {
159                     num_threads=Integer.parseInt(args[++i]);
160                     continue;
161                 }
162                 if("-interval".equals(args[i])) {
163                     interval=Long.parseLong(args[++i]);
164                     continue;
165                 }
166                 if("-props".equals(args[i])) {
167                     props=args[++i];
168                     continue;
169                 }
170                 if("-discard_local".equals(args[i])) {
171                     discard_local=true;
172                     continue;
173                 }
174                 help();
175                 return;
176             }
177
178
179             new RpcDispatcherStressTest().start(props, num_threads, interval, discard_local);
180         }
181         catch(Exception JavaDoc e) {
182             System.err.println(e);
183         }
184     }
185
186
187     static void help() {
188         System.out.println("RpcDispatcherStressTest [-help] [-interval <msecs>] " +
189                            "[-num_threads <number>] [-props <stack properties>] [-discard_local]");
190     }
191 }
192
Popular Tags