KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > UnicastTest


1 // $Id: UnicastTest.java,v 1.5 2005/02/07 08:38:25 belaban Exp $
2

3 package org.jgroups.tests;
4
5 import org.jgroups.*;
6 import org.jgroups.util.Util;
7
8 import java.io.*;
9 import java.util.Vector JavaDoc;
10
11
12 /**
13  * Tests the UNICAST by sending unicast messages between a sender and a receiver
14  *
15  * @author Bela Ban
16  */

17 public class UnicastTest implements Runnable JavaDoc {
18     UnicastTest test;
19     JChannel channel;
20     final String JavaDoc groupname="UnicastTest-Group";
21     Thread JavaDoc t=null;
22     long sleep_time=0;
23     boolean exit_on_end=false, busy_sleep=false;
24
25
26     public static class Data implements Externalizable {
27         public Data() {
28         }
29
30         public void writeExternal(ObjectOutput out) throws IOException {
31         }
32
33         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
34         }
35     }
36
37     public static class StartData extends Data {
38         long num_values=0;
39
40         public StartData() {
41             super();
42         }
43
44         StartData(long num_values) {
45             this.num_values=num_values;
46         }
47
48         public void writeExternal(ObjectOutput out) throws IOException {
49             out.writeLong(num_values);
50         }
51
52         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
53             num_values=in.readLong();
54         }
55     }
56
57     public static class Value extends Data {
58         long value=0;
59
60         public Value() {
61             super();
62         }
63
64         Value(long value) {
65             this.value=value;
66         }
67
68         public void writeExternal(ObjectOutput out) throws IOException {
69             out.writeLong(value);
70         }
71
72         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
73             value=in.readLong();
74         }
75     }
76
77
78     public void init(String JavaDoc props, long sleep_time, boolean exit_on_end, boolean busy_sleep) throws Exception JavaDoc {
79         this.sleep_time=sleep_time;
80         this.exit_on_end=exit_on_end;
81         this.busy_sleep=busy_sleep;
82         channel=new JChannel(props);
83         channel.connect(groupname);
84         t=new Thread JavaDoc(this, "UnicastTest - receiver thread");
85         t.start();
86     }
87
88
89     public void run() {
90         Data data;
91         Message msg;
92         Object JavaDoc obj;
93         boolean started=false;
94         long start=0, stop=0;
95         long current_value=0, tmp=0, num_values=0;
96         long total_msgs=0, total_time=0, msgs_per_sec;
97
98         while(true) {
99             try {
100                 obj=channel.receive(0);
101                 if(obj instanceof View)
102                     System.out.println("** view: " + obj);
103                 else
104                     if(obj instanceof Message) {
105                         msg=(Message)obj;
106                         data=(Data)msg.getObject();
107
108                         if(data instanceof StartData) {
109                             if(started) {
110                                 System.err.println("UnicastTest.run(): received START data, but am already processing data");
111                                 continue;
112                             }
113                             else {
114                                 started=true;
115                                 current_value=0; // first value to be received
116
tmp=0;
117                                 num_values=((StartData)data).num_values;
118                                 start=System.currentTimeMillis();
119                             }
120                         }
121                         else
122                             if(data instanceof Value) {
123                                 tmp=((Value)data).value;
124                                 if(current_value + 1 != tmp) {
125                                     System.err.println("-- message received (" + tmp + ") is not 1 greater than " + current_value);
126                                 }
127                                 else {
128                                     current_value++;
129                                     if(current_value % 100 == 0)
130                                         System.out.println("received " + current_value);
131                                     if(current_value >= num_values) {
132                                         stop=System.currentTimeMillis();
133                                         total_time=stop - start;
134                                         msgs_per_sec=(long)(num_values / (total_time / 1000.0));
135                                         System.out.println("-- received " + num_values + " messages in " + total_time +
136                                                            " ms (" + msgs_per_sec + " messages/sec)");
137                                         started=false;
138                                         if(exit_on_end)
139                                             System.exit(0);
140                                         continue;
141                                     }
142                                 }
143                             }
144                     }
145             }
146             catch(ChannelNotConnectedException not_connected) {
147                 System.err.println(not_connected);
148                 break;
149             }
150             catch(ChannelClosedException closed_ex) {
151                 System.err.println(closed_ex);
152                 break;
153             }
154             catch(TimeoutException timeout) {
155                 System.err.println(timeout);
156                 break;
157             }
158             catch(Throwable JavaDoc t) {
159                 System.err.println(t);
160                 started=false;
161                 current_value=0;
162                 tmp=0;
163                 Util.sleep(1000);
164             }
165         }
166         // System.out.println("UnicastTest.run(): receiver thread terminated");
167
}
168
169
170     public void eventLoop() throws Exception JavaDoc {
171         int c;
172
173         while(true) {
174             System.out.print("[1] Send msgs [2] Print view [q] Quit ");
175             System.out.flush();
176             c=System.in.read();
177             switch(c) {
178             case -1:
179                 break;
180             case '1':
181                 sendMessages();
182                 break;
183             case '2':
184                 printView();
185                 break;
186             case '3':
187                 break;
188             case '4':
189                 break;
190             case '5':
191                 break;
192             case '6':
193                 break;
194             case 'q':
195                 channel.close();
196                 return;
197             default:
198                 break;
199             }
200         }
201     }
202
203
204     void sendMessages() throws Exception JavaDoc {
205         long num_msgs=getNumberOfMessages();
206         Address receiver=getReceiver();
207         Message msg;
208         Value val=new Value(1);
209
210         if(receiver == null) {
211             System.err.println("UnicastTest.sendMessages(): receiver is null, cannot send messages");
212             return;
213         }
214
215         System.out.println("sending " + num_msgs + " messages to " + receiver);
216         msg=new Message(receiver, null, new StartData(num_msgs));
217         channel.send(msg);
218
219         for(int i=1; i <= num_msgs; i++) {
220             val=new Value(i);
221             msg=new Message(receiver, null, val);
222             if(i % 100 == 0)
223                 System.out.println("-- sent " + i);
224             channel.send(msg);
225             if(sleep_time > 0)
226                 Util.sleep(sleep_time, busy_sleep);
227         }
228         System.out.println("done sending " + num_msgs + " to " + receiver);
229     }
230
231     void printView() {
232         System.out.println("\n-- view: " + channel.getView() + '\n');
233         try {
234             System.in.skip(System.in.available());
235         }
236         catch(Exception JavaDoc e) {
237         }
238     }
239
240
241     long getNumberOfMessages() {
242         BufferedReader reader=null;
243         String JavaDoc tmp=null;
244
245         try {
246             System.out.print("Number of messages to send: ");
247             System.out.flush();
248             System.in.skip(System.in.available());
249             reader=new BufferedReader(new InputStreamReader(System.in));
250             tmp=reader.readLine().trim();
251             return Long.parseLong(tmp);
252         }
253         catch(Exception JavaDoc e) {
254             System.err.println("UnicastTest.getNumberOfMessages(): " + e);
255             return 0;
256         }
257     }
258
259     Address getReceiver() {
260         Vector JavaDoc mbrs=null;
261         int index;
262         BufferedReader reader;
263         String JavaDoc tmp;
264
265         try {
266             mbrs=channel.getView().getMembers();
267             System.out.println("pick receiver from the following members:");
268             for(int i=0; i < mbrs.size(); i++) {
269                 if(mbrs.elementAt(i).equals(channel.getLocalAddress()))
270                     System.out.println("[" + i + "]: " + mbrs.elementAt(i) + " (self)");
271                 else
272                     System.out.println("[" + i + "]: " + mbrs.elementAt(i));
273             }
274             System.out.flush();
275             System.in.skip(System.in.available());
276             reader=new BufferedReader(new InputStreamReader(System.in));
277             tmp=reader.readLine().trim();
278             index=Integer.parseInt(tmp);
279             return (Address)mbrs.elementAt(index); // index out of bounds caught below
280
}
281         catch(Exception JavaDoc e) {
282             System.err.println("UnicastTest.getReceiver(): " + e);
283             return null;
284         }
285     }
286
287
288     public static void main(String JavaDoc[] args) {
289         long sleep_time=0;
290         boolean exit_on_end=false;
291         boolean busy_sleep=false;
292
293         String JavaDoc udp_props="UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" +
294                 "ucast_recv_buf_size=32000;ucast_send_buf_size=64000;" +
295                 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):";
296
297         String JavaDoc regular_props="PING(timeout=1000;num_initial_members=2):" +
298                 "MERGE2(min_interval=5000;max_interval=10000):" +
299                 "FD_SOCK:" +
300                 "VERIFY_SUSPECT(timeout=1500):" +
301                 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):" +
302                 "UNICAST(timeout=2000,4000,6000;window_size=100;min_threshold=10;use_gms=false):" +
303                 "pbcast.STABLE(desired_avg_gossip=20000):" +
304                 "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
305                 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
306
307         String JavaDoc props=udp_props + regular_props;
308         String JavaDoc loopback_props="LOOPBACK:" + regular_props;
309
310         for(int i=0; i < args.length; i++) {
311             if("-help".equals(args[i])) {
312                 help();
313                 return;
314             }
315             if("-props".equals(args[i])) {
316                 props=args[++i];
317                 continue;
318             }
319             if("-sleep".equals(args[i])) {
320                 sleep_time=Long.parseLong(args[++i]);
321                 continue;
322             }
323             if("-loopback".equals(args[i])) {
324                 props=loopback_props;
325                 continue;
326             }
327             if("-exit_on_end".equals(args[i])) {
328                 exit_on_end=true;
329                 continue;
330             }
331             if("-busy_sleep".equals(args[i])) {
332                 busy_sleep=true;
333                 continue;
334             }
335         }
336
337
338         try {
339             UnicastTest test=new UnicastTest();
340             test.init(props, sleep_time, exit_on_end, busy_sleep);
341             test.eventLoop();
342         }
343         catch(Exception JavaDoc ex) {
344             System.err.println(ex);
345         }
346     }
347
348     static void help() {
349         System.out.println("UnicastTest [-help] [-props <props>] [-sleep <time in ms between msg sends] " +
350                            "[-loopback] [-exit_on_end] [-busy-sleep]");
351     }
352 }
353
Popular Tags