KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > ContinousThroughputTest


1 package org.jgroups.tests;
2
3 import org.jgroups.*;
4 import org.jgroups.stack.IpAddress;
5 import org.jgroups.stack.Protocol;
6 import org.jgroups.stack.ProtocolObserver;
7 import org.jgroups.util.Util;
8
9 import java.io.BufferedReader JavaDoc;
10 import java.io.File JavaDoc;
11 import java.io.FileWriter JavaDoc;
12 import java.io.InputStreamReader JavaDoc;
13 import java.util.Date JavaDoc;
14
15
16 /**
17  * <h1>ContinousThroughputTest.java</h1>
18  * <p/>
19  * This is a program to make Throughput tests.
20  * <p/>
21  * The program assumes to run on a reliable network where no partitioning or failures happen (Apart for cping test).
22  * Once you run the program it connects the channel and gives you a prompt.
23  * Every time a new view is received you will see it printed.
24  * Once you have launched the program on all the machine you use for the test just digit
25  * on one machine the command for the test you desire to make, you will be asked for the necessary parameters,
26  * then the test starts.
27  * Depending on the chosen test you will see its results on the monitor and them ar logged
28  * on a file on the working dir called <code>"ContinousThroughputTest<hostname><systemTimeInSeconds>.log"</code> .
29  *
30  * @author Gianluca Collot
31  * @version 1.0
32  */

33
34 public class ContinousThroughputTest {
35     String JavaDoc props="UDP:" +
36             "PING(up_thread=false;down_thread=false):" +
37             "FD(timeout=1000;shun=false):" +
38             "STABLE(up_thread=false;down_thread=false):" +
39             "MERGE(up_thread=false;down_thread=false):" +
40             "NAKACK:" +
41             "FLUSH:" +
42             "GMS:" +
43             "VIEW_ENFORCER(up_thread=false;down_thread=false):" +
44 // "TSTAU:" +
45
"QUEUE(up_thread=false;down_thread=false)";
46 // String props= "TCP:TCPPING(initial_hosts=manolete2[8880]):FD(timeout=10000):" +
47
// "STABLE:MERGE:NAKACK:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE";
48
JChannel channel=null;
49     Thread JavaDoc sendThread, receiveThread;
50     boolean coordinator=false;
51     IpAddress my_addr=null;
52     View view;
53     BufferedReader JavaDoc reader;
54     float troughputSum=0, meanTroughput=0, minTroughput=10000, maxTroughput=0;
55     int numTests=0;
56     FileWriter JavaDoc logWriter;
57     Protocol prot=null;
58
59     /**
60      * Creates threads, creates and connects channel opens log file
61      */

62
63     public ContinousThroughputTest() {
64         sendThread=new Thread JavaDoc("sendThread") {
65             public void run() {
66                 parser();
67             }
68         };
69         receiveThread=new Thread JavaDoc("receiveThread") {
70             public void run() {
71                 checkChannel();
72             }
73         };
74         reader=new BufferedReader JavaDoc(new InputStreamReader JavaDoc(System.in));
75         try {
76             channel=new JChannel(props);
77 // prot = (Protocol) channel.getProtocolStack().getProtocols().lastElement();
78
// prot.setObserver(new ContinousThroughputTest.MessageLenghtObserver());
79
channel.setOpt(Channel.BLOCK, Boolean.FALSE);
80             channel.setOpt(Channel.SUSPECT, Boolean.FALSE);
81             channel.connect("Janus");
82         }
83         catch(Exception JavaDoc ex) {
84             System.out.println("Connection Failed!" + ex);
85             System.exit(1);
86         }
87         my_addr=(IpAddress)channel.getLocalAddress();
88
89         try {
90             File JavaDoc log=new File JavaDoc("ContinousThroughputTest" + my_addr.getIpAddress().getHostName()
91                     + (System.currentTimeMillis() / 10000) + ".log");
92             if(!log.exists()) {
93                 log.createNewFile();
94             }
95             logWriter=new FileWriter JavaDoc(log);
96             logWriter.write("ContinousThroughputTest.java log\r\n");
97             logWriter.write("Date:" + new Date JavaDoc(System.currentTimeMillis()) + "\r\n");
98             log("Protocol Stack is " + props);
99             System.out.println("Protocol Stack is " + props);
100         }
101         catch(Exception JavaDoc ex) {
102             System.out.println("File problems " + ex);
103             System.exit(5);
104         }
105     }
106
107     static void main(String JavaDoc[] args) {
108         ContinousThroughputTest perfTest=new ContinousThroughputTest();
109         perfTest.go();
110     }
111
112     void go() {
113 // Starts Receiving
114
receiveThread.start();
115 // Starts input Parser
116
sendThread.start();
117     }
118
119     /**
120      * This function should be called in its own thread.
121      * It recives messages and calculates the troughput
122      */

123
124     public void checkChannel() {
125         String JavaDoc payload=null;
126         Object JavaDoc received=null;
127         Message msg=null;
128         boolean done=false;
129         long n;
130         int i=1;
131
132         System.out.println("Started receiving");
133         try {
134             while(!done) {
135                 received=channel.receive(0);
136                 if(received instanceof Message) {
137                     msg=(Message)received;
138                     payload=(String JavaDoc)msg.getObject();
139                     System.out.println(payload);
140                     if("stop".equalsIgnoreCase(payload)) {
141                         done=true;
142                     }
143                     if("pingpong".equalsIgnoreCase(payload)) {
144                         n=((Long JavaDoc)((Message)channel.receive(0)).getObject()).longValue();
145                         i=((Integer JavaDoc)((Message)channel.receive(0)).getObject()).intValue();
146                         log("Starting pingpong test. Rounds: " + n + " Bursts: " + i);
147                         pingpongTest(n, i, false);
148                     }
149                     if("cping".equalsIgnoreCase(payload)) {
150 // i = ((Integer) ((Message) channel.receive(0)).getObject()).intValue();
151
log("Starting cping test. Bursts: " + 1);
152                         cpingTest(1, true);
153                     }
154                     if("sweep".equalsIgnoreCase(payload)) {
155                         n=((Long JavaDoc)((Message)channel.receive(0)).getObject()).longValue();
156                         i=((Integer JavaDoc)((Message)channel.receive(0)).getObject()).intValue();
157                         log("Starting sweep test. Rounds: " + n + " initial burst: " + i);
158                         sweep(n, i);
159                     }
160                 }
161                 if(received instanceof View) {
162                     view=(View)received;
163                     System.out.println(view);
164                     if(view.getMembers().elementAt(0).equals(my_addr)) {
165                         System.out.println("I'm the new Coordinator");
166                         coordinator=true;
167                     }
168                     resetData();
169                 }
170             }
171         }
172         catch(Exception JavaDoc ex) {
173             System.out.println("checkChannel() :" + ex);
174             try {
175                 logWriter.write("Stopped cause " + ex + "\r\n");
176             }
177             catch(Exception JavaDoc e) {
178             }
179             System.exit(2);
180         }
181         System.out.println("Stopped Receiving");
182
183         channel.disconnect();
184         System.out.println("Disconnected from \"Janus\"");
185         channel.close();
186         System.out.println("Channel Closed");
187         System.exit(0);
188     }
189
190     /**
191      * This function should be run in its own thread and sends messages on an already connected channel
192      */

193     public void parser() {
194         boolean done=false;
195         String JavaDoc input;
196         int number=0;
197         int burstlength=1;
198
199         System.out.println("Ready.");
200         try {
201             while(!done) {
202                 input=reader.readLine();
203                 if("stop".equalsIgnoreCase(input)) {
204                     done=true;
205                 }
206                 if("pingpong".equalsIgnoreCase(input)) {
207                     number=askNumber(reader, "How many rounds?");
208                     burstlength=askNumber(reader, "Length of bursts?");
209                     channel.send(new Message(null, null, input));
210                     channel.send(new Message(null, null, new Long JavaDoc(number)));
211                     channel.send(new Message(null, null, new Integer JavaDoc(burstlength)));
212                     continue;
213
214                 }
215                 if("cping".equalsIgnoreCase(input)) {
216 // burstlength = askNumber( reader,"Length of bursts?");
217
channel.send(new Message(null, null, input));
218 // channel.send(new Message(null,null,new Integer(burstlength)));
219
continue;
220                 }
221                 if("sweep".equalsIgnoreCase(input)) {
222                     number=askNumber(reader, "Number of tests");
223                     burstlength=askNumber(reader, "Initial length of bursts?");
224                     channel.send(new Message(null, null, input));
225                     channel.send(new Message(null, null, new Long JavaDoc(number)));
226                     channel.send(new Message(null, null, new Integer JavaDoc(burstlength)));
227                     continue;
228                 }
229                 channel.send(new Message(null, null, input));
230             }
231         }
232         catch(Exception JavaDoc ex) {
233             System.out.println(ex);
234         }
235     }
236
237     /**
238      * sendBurst(int n): sends a burst of messages with small payload
239      */

240
241     void sendBurst(long n) {
242         try {
243             byte[] buf=Util.objectToByteBuffer("Standard Mex");
244             for(int i=0; i < n; i++) {
245                 channel.send(new Message(null, null, buf));
246             }
247         }
248         catch(Exception JavaDoc ex) {
249             System.out.println("sendBurst: " + ex);
250         }
251     }
252
253
254     /**
255      * showStats: Prints resulting times and troughput
256      */

257
258     void showStats(long start, long stop, long messages, int burstlength) {
259         String JavaDoc result;
260         long elapsedTime=(stop - start);
261         long troughPut=(messages * 1000) / elapsedTime;
262 // troughputSum += troughPut;
263
maxTroughput=(maxTroughput > troughPut) ? maxTroughput : troughPut;
264         minTroughput=(minTroughput < troughPut) ? minTroughput : troughPut;
265 // System.out.println("Elapsed Time: " + (stop-start) + " milliseconds to receive " + messages + " messages");
266
result="Elapsed Time: " + (stop - start) +
267                 "| messages:" + messages +
268                 "| burst length:" + burstlength +
269                 "| Troughput:" + troughPut +
270                 "| max: " + maxTroughput +
271                 "| min: " + minTroughput +
272                 "\r\n";
273         System.out.println(result);
274         try {
275             logWriter.write(result);
276             logWriter.flush();
277         }
278         catch(Exception JavaDoc ex) {
279             System.out.println("showStats():" + ex);
280         }
281
282     }
283
284     int askNumber(BufferedReader JavaDoc reader, String JavaDoc text) {
285         int number=0;
286         String JavaDoc input="10";
287         System.out.println(text);
288         try {
289             input=reader.readLine();
290         }
291         catch(Exception JavaDoc ex) {
292             System.out.println("AskNumber :" + ex);
293         }
294
295         number=Integer.parseInt(input);
296         return number;
297     }
298
299     /**
300      * Resets stored statistics and counters
301      */

302
303     void resetData() {
304         maxTroughput=0;
305         minTroughput=10000;
306         meanTroughput=0;
307         numTests=0;
308         troughputSum=0;
309     }
310
311     /**
312      * Make a pingpong test:
313      * For n times a message is sent and view.size() messages are received
314      * Every 1000 messages sent the throughput is evaluated or at the end of the test
315      */

316     void pingpongTest(long n, int burst_length, boolean partialResultsPrint) {
317         long i=0;
318         long start=System.currentTimeMillis();
319         long tempstart=System.currentTimeMillis();
320         long stop, throughput;
321         try {
322             for(i=0; i < n; i++) {
323                 for(int k=0; k < burst_length; k++)
324                     channel.send(new Message(null, null, new Long JavaDoc(i)));
325                 for(int j=0; j < (view.size() * burst_length); j++) {
326                     channel.receive(20000);
327                 }
328                 if(partialResultsPrint && ((i % 1000) == 0)) {
329                     if(i == 0) continue;
330                     stop=System.currentTimeMillis();
331                     throughput=(1000000 / (stop - tempstart)) * view.size() * burst_length;
332                     try {
333                         System.out.println(new Date JavaDoc(stop).toString() + " : " + throughput);
334                         logWriter.write(new Date JavaDoc(stop).toString() + " : " + throughput);
335                         logWriter.write("\r\n");
336                         logWriter.flush();
337                         tempstart=System.currentTimeMillis();
338                     }
339                     catch(Exception JavaDoc ex) {
340                         ex.printStackTrace();
341                     }
342                 }
343             }
344         }
345         catch(TimeoutException ex) {
346             System.out.println("Timeout Receiving, round: " + i);
347             System.exit(5);
348         }
349         catch(Exception JavaDoc ex) {
350             ex.printStackTrace();
351             System.exit(4);
352         }
353         stop=System.currentTimeMillis();
354         showStats(start, stop, n * view.size() * burst_length, burst_length);
355     }
356
357     void sweep(long tests, int burstlenght) {
358         long messagespertest=10000;
359         for(int i=0; i < tests; i++) {
360             burstlenght+=i;
361             pingpongTest(messagespertest / burstlenght, burstlenght, false);
362         }
363     }
364
365     /**
366      * Makes a continous test handling view changes
367      */

368     void cpingTest(int burst_lenght, boolean printoutput) {
369         Object JavaDoc recvd=null;
370         long start=System.currentTimeMillis();
371         for(long i=1; i < Long.MAX_VALUE; i++) {
372 // System.out.println("Round: " + i);
373
try {
374                 channel.send(null, null, "cping");
375                 for(int j=0; j < burst_lenght * view.size();) {
376                     recvd=channel.receive(10000);
377                     if(recvd instanceof View) {
378                         view=(View)recvd;
379                         System.out.println(view);
380                         log(view.toString());
381                     }
382                     else {
383                         j++;
384                     }
385                 }
386             }
387             catch(TimeoutException tex) {
388                 try {
389                     channel.send(new Message(null, null, "cping"));
390                     System.out.println("Resent a message for timeout");
391                     log("Resent a message for timeout");
392                 }
393                 catch(Exception JavaDoc ex) {
394                     System.exit(9);
395                 }
396             }
397             catch(Exception JavaDoc ex) {
398                 System.exit(9);
399             }
400             if((i % 1000) == 0) {
401                 long stop=System.currentTimeMillis();
402                 long throughput=i * 1000 * view.size() / (stop - start);
403                 System.out.println("Througputh = " + throughput);
404                 log("Througputh = " + throughput);
405                 start=System.currentTimeMillis();
406                 i=0;
407             }
408         }
409     }
410
411     /**
412      * Used to print messages lenght and their serialized contents.
413      */

414
415     public static class MessageLenghtObserver implements ProtocolObserver {
416
417         public void setProtocol(Protocol prot) {
418             /** todo: Implement this org.jgroups.debug.ProtocolObserver method*/
419             throw new java.lang.UnsupportedOperationException JavaDoc("Method setProtocol() not yet implemented.");
420         }
421
422         public boolean up(Event evt, int num_evts) {
423             /** todo: Implement this org.jgroups.debug.ProtocolObserver method*/
424             throw new java.lang.UnsupportedOperationException JavaDoc("Method up() not yet implemented.");
425         }
426
427         public boolean passUp(Event evt) {
428             return true;
429         }
430
431         public boolean down(Event evt, int num_evts) {
432             return true;
433         }
434
435         public boolean passDown(Event evt) {
436             byte[] buf=null;
437             if(evt.getType() == Event.MSG)
438                 try {
439                     buf=Util.objectToByteBuffer(evt.getArg());
440                     System.out.println("UDP: sending a message of " +
441                             buf.length +
442                             "bytes");
443                     System.out.println("Message was :");
444                     System.out.println(new String JavaDoc(buf));
445                 }
446                 catch(Exception JavaDoc ex) {
447
448                 }
449             return true;
450         }
451     }
452
453     void log(String JavaDoc str) {
454         try {
455             logWriter.write(str + "\r\n");
456             logWriter.flush();
457         }
458         catch(Exception JavaDoc ex) {
459
460         }
461     }
462 }
463
Popular Tags