KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > UnicastStressTest


1 package org.jgroups.tests;
2
3 import org.jgroups.Address;
4 import org.jgroups.JChannel;
5 import org.jgroups.View;
6 import org.jgroups.blocks.GroupRequest;
7 import org.jgroups.blocks.RpcDispatcher;
8 import org.jgroups.util.Util;
9
10 import java.util.Vector JavaDoc;
11 import java.util.concurrent.CyclicBarrier JavaDoc;
12 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
13 import java.util.concurrent.atomic.AtomicLong JavaDoc;
14
15 /**
16  * Tests UNICAST by sending anycast messages via an RpcDispatcher
17  * @author Bela Ban
18  * @version $Id: UnicastStressTest.java,v 1.3 2007/04/24 13:57:19 belaban Exp $
19  */

20 public class UnicastStressTest {
21     int num_channels=6;
22     int num_threads=1; // number of threads per channel
23
int num_msgs=1000; // number of messages sent by 1 thread
24
int msg_size=4096; // number of bytes / message
25
String JavaDoc props=null;
26     int buddies=1;
27
28     private JChannel[] channels;
29     private RpcDispatcher[] dispatchers;
30     private Receiver[] receivers;
31
32     final AtomicInteger JavaDoc msgs_received=new AtomicInteger JavaDoc(0);
33     final AtomicLong JavaDoc bytes_received=new AtomicLong JavaDoc(0);
34
35     final CyclicBarrier JavaDoc start_barrier;
36     final CyclicBarrier JavaDoc terminate_barrier;
37
38
39     public UnicastStressTest(String JavaDoc props, int num_channels, int num_threads, int num_msgs, int msg_size, int buddies) {
40         this.props=props;
41         this.num_channels=num_channels;
42         this.num_threads=num_threads;
43         this.num_msgs=num_msgs;
44         this.msg_size=msg_size;
45         this.buddies=buddies;
46         start_barrier=new CyclicBarrier JavaDoc(num_channels * num_threads +1);
47         terminate_barrier=new CyclicBarrier JavaDoc(num_channels +1);
48         if(buddies > num_channels)
49             throw new IllegalArgumentException JavaDoc("buddies needs to be smaller than number of channels");
50     }
51
52
53     private void start() throws Exception JavaDoc {
54         channels=new JChannel[num_channels];
55         receivers=new Receiver[num_channels];
56         dispatchers=new RpcDispatcher[num_channels];
57         long start, stop;
58
59         int num_expected_msgs=num_threads * num_msgs * buddies;
60         int num_total_msgs=num_channels * num_threads * num_msgs; // over all channels
61
for(int i=0; i < channels.length; i++) {
62             channels[i]=new JChannel(props);
63             receivers[i]=new Receiver(terminate_barrier, bytes_received, msgs_received, num_expected_msgs, num_total_msgs);
64             dispatchers[i]=new RpcDispatcher(channels[i], null, null, receivers[i]);
65             channels[i].connect("x");
66         }
67
68         // start the senders
69
for(int i=0; i < channels.length; i++) {
70             JChannel channel=channels[i];
71             View view=channel.getView();
72             Vector JavaDoc<Address> members=view.getMembers();
73             if(members.size() != num_channels) {
74                 throw new Exception JavaDoc("cluster has not formed correctly, expected " + num_channels + " channels, found" +
75                         " only " + members.size() + " (view: " + view + ")");
76             }
77             Vector JavaDoc<Address> tmp=pickBuddies(members, channel.getLocalAddress());
78
79             for(int j=0; j < num_threads; j++) {
80                 Sender sender=new Sender(start_barrier, msg_size, num_msgs, dispatchers[i], channel.getLocalAddress(), tmp);
81                 sender.start(); // will wait on barrier
82
}
83         }
84
85         System.out.println("sending " + num_total_msgs + " msgs with " + num_threads + " threads over " + num_channels + " channels");
86
87         start_barrier.await(); // signals all senders to start
88
start=System.currentTimeMillis();
89
90
91         terminate_barrier.await(); // when all receivers have received all messages
92
stop=System.currentTimeMillis();
93
94         for(int i=0; i < dispatchers.length; i++) {
95             dispatchers[i].stop();
96         }
97         for(int i=channels.length -1; i >= 0; i--) {
98             channels[i].close();
99         }
100
101         printStats(stop - start);
102     }
103
104     private void printStats(long time) {
105         for(int i=0; i < receivers.length; i++) {
106             System.out.println("receiver #" + (i+1) + ": " + receivers[i].getNumReceivedMessages());
107         }
108         System.out.println("total received messages for " + num_channels + " channels: " + msgs_received.get());
109         System.out.println("total bytes received by " + num_channels + " channels: " + Util.printBytes(bytes_received.get()));
110         System.out.println("time: " + time + " ms");
111         double msgs_per_sec=msgs_received.get() / (time /1000.0);
112         double throughput=bytes_received.get() / (time / 1000.0);
113         System.out.println("Message rate: " + msgs_per_sec + " msgs/sec");
114         System.out.println("Throughput: " + Util.printBytes(throughput) + " / sec");
115     }
116
117     private Vector JavaDoc<Address> pickBuddies(Vector JavaDoc<Address> members, Address local_addr) {
118         Vector JavaDoc<Address> retval=new Vector JavaDoc<Address>();
119         int index=members.indexOf(local_addr);
120         if(index < 0)
121             return null;
122         for(int i=index +1; i <= index + buddies; i++) {
123             int real_index=i % members.size();
124             Address buddy=members.get(real_index);
125             retval.add(buddy);
126         }
127         return retval;
128     }
129
130
131     public static class Receiver {
132         final AtomicInteger JavaDoc msgs;
133         final AtomicLong JavaDoc bytes;
134         final int num_expected_msgs, num_total_msgs, print;
135         final CyclicBarrier JavaDoc barrier;
136         final AtomicInteger JavaDoc num_received_msgs=new AtomicInteger JavaDoc(0);
137
138
139         public Receiver(CyclicBarrier JavaDoc barrier, AtomicLong JavaDoc bytes, AtomicInteger JavaDoc msgs, int num_expected_msgs, int num_total_msgs) {
140             this.barrier=barrier;
141             this.bytes=bytes;
142             this.msgs=msgs;
143             this.num_expected_msgs=num_expected_msgs;
144             this.num_total_msgs=num_total_msgs;
145             print=num_total_msgs / 10;
146         }
147
148         public int getNumReceivedMessages() {return num_received_msgs.get();}
149
150         public void receive(byte[] data) {
151             msgs.incrementAndGet();
152             bytes.addAndGet(data.length);
153
154             int count=num_received_msgs.incrementAndGet();
155             if(count % print == 0) {
156                 System.out.println("received " + count + " msgs");
157             }
158             
159             if((count=num_received_msgs.get()) >= num_expected_msgs) {
160                 try {
161                     barrier.await();
162                 }
163                 catch(Exception JavaDoc e) {
164                 }
165             }
166
167         }
168     }
169
170     private static class Sender extends Thread JavaDoc {
171         private final CyclicBarrier JavaDoc barrier;
172         private final int num_msgs;
173         private final int msg_size;
174         private final RpcDispatcher disp;
175         private final Vector JavaDoc buddies;
176
177
178         public Sender(CyclicBarrier JavaDoc barrier, int msg_size, int num_msgs, RpcDispatcher disp, Address local_addr, Vector JavaDoc buddies) {
179             this.barrier=barrier;
180             this.msg_size=msg_size;
181             this.num_msgs=num_msgs;
182             this.disp=disp;
183             this.buddies=buddies;
184             setName("Sender (" + local_addr + " --> " + buddies + ")");
185         }
186
187         public void run() {
188             final byte[] data=new byte[msg_size];
189             final Object JavaDoc[] arg=new Object JavaDoc[]{data};
190             final Class JavaDoc[] types=new Class JavaDoc[]{byte[].class};
191
192             try {
193                 barrier.await();
194             }
195             catch(Exception JavaDoc e) {
196             }
197
198             for(int i=0; i < num_msgs; i++) {
199                 disp.callRemoteMethods(buddies, "receive", arg, types, GroupRequest.GET_NONE, 5000, true);
200             }
201         }
202     }
203
204
205
206     public static void main(String JavaDoc[] args) throws Exception JavaDoc {
207         int num_channels=6;
208         int num_threads=10; // number of threads per channel
209
int num_msgs=10000; // number of messages sent by 1 thread
210
int msg_size=4096; // number of bytes / message
211
int buddies=1;
212         String JavaDoc props=null;
213
214         for(int i=0; i < args.length; i++) {
215             if(args[i].equalsIgnoreCase("-props")) {
216                 props=args[++i];
217                 continue;
218             }
219             if(args[i].equalsIgnoreCase("-num_channels")) {
220                 num_channels=Integer.parseInt(args[++i]);
221                 continue;
222             }
223             if(args[i].equalsIgnoreCase("-num_threads")) {
224                 num_threads=Integer.parseInt(args[++i]);
225                 continue;
226             }
227             if(args[i].equalsIgnoreCase("-num_msgs")) {
228                 num_msgs=Integer.parseInt(args[++i]);
229                 continue;
230             }
231             if(args[i].equalsIgnoreCase("-msg_size")) {
232                 msg_size=Integer.parseInt(args[++i]);
233                 continue;
234             }
235             if(args[i].equalsIgnoreCase("-buddies")) {
236                 buddies=Integer.parseInt(args[++i]);
237                 continue;
238             }
239             help();
240             return;
241         }
242
243         new UnicastStressTest(props, num_channels, num_threads, num_msgs, msg_size, buddies).start();
244     }
245
246
247
248
249     private static void help() {
250         System.out.println("UnicastStressTest [-help] [-props <props>] [-num_channels <num>] " +
251                 "[-num_threads <threads per channel>] [-num_msgs <number of msgs per thread>] [-msg_size <size in bytes>] " +
252                 "[-buddies <num>]");
253     }
254 }
255
Popular Tags