KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > adapt > ReceiverThread


1 package org.jgroups.tests.adapt;
2
3 import org.apache.log4j.Logger;
4 import org.jgroups.*;
5
6 /** Receiver thread: loops until it receives the expected
7  * number of messages. It measures the elapsed time between
8  * the reception of the first and the last message and
9  * calculates the throughputs. At the end, it closes the
10  * channel connection. (Actually the elapsed time refers
11  * to the reception of expected_msgs - 1, but that's not
12  * important.)
13  * @author Milcan Prica (prica@deei.units.it)
14  * @author Bela Ban (belaban@yahoo.com)
15
16  */

17 public class ReceiverThread extends Thread JavaDoc {
18
19     private Channel channel;
20     private int num_msgs;
21     private int msg_size;
22     private int num_senders;
23     Logger log=Logger.getLogger(this.getClass());
24     long counter=1;
25     long expected_msgs=num_msgs * num_senders;
26     long beginning=0, ending=0, elapsed_time, last_dump;
27     long log_interval=1000;
28     boolean gnuplot_output=Boolean.getBoolean("gnuplot_output");
29
30
31     public ReceiverThread(Channel ch, int num_msgs, int msg_size, int ns, long log_interval) {
32         channel=ch;
33         this.num_msgs=num_msgs;
34         this.msg_size=msg_size;
35         num_senders=ns;
36         expected_msgs=num_msgs * num_senders;
37         this.log_interval=log_interval;
38     }
39
40     public void run() {
41         double throughput_s, throughput_b;
42         System.out.println("\nReceiver thread started...\n");
43         counter=1;
44         beginning=0;
45         ending=0;
46
47         if(gnuplot_output) {
48             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
49             sb.append("\n##### msgs_received");
50             sb.append(", free_mem [KB] ");
51             sb.append(", total_mem [KB] ");
52             sb.append(", total_msgs_sec [msgs/sec] ");
53             sb.append(", total_throughput [KB/sec] ");
54             sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) ");
55             sb.append(" [msgs/sec] ");
56             sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) ");
57             sb.append(" [KB/sec]\n");
58             if(log.isInfoEnabled()) log.info(sb.toString());
59         }
60
61
62         while(counter <= expected_msgs) {
63             try {
64                 Object JavaDoc received=channel.receive(60000);
65                 if(received instanceof Message) {
66                     if(counter == 1) {
67                         beginning=System.currentTimeMillis();
68                         last_dump=beginning;
69                     }
70                     if(counter == expected_msgs)
71                         ending=System.currentTimeMillis();
72                     counter++;
73                     if(counter % 1000 == 0) {
74                         System.out.println("-- received " + counter + " msgs");
75                     }
76                     if(counter % log_interval == 0) {
77                         if(log.isInfoEnabled()) log.info(dumpStats(counter));
78                     }
79                 }
80             }
81             catch(ClassCastException JavaDoc e) {
82                 continue;
83             }
84             catch(ChannelNotConnectedException e) {
85                 e.printStackTrace();
86             }
87             catch(ChannelClosedException e) {
88                 e.printStackTrace();
89             }
90             catch(TimeoutException e) {
91                 ending=System.currentTimeMillis();
92                 System.out.println("Received " + counter + " / " +
93                         expected_msgs + " messages");
94                 expected_msgs=counter;
95                 break;
96             }
97         }
98         elapsed_time=(ending - beginning);
99
100         System.out.println("expected_msgs=" + expected_msgs + ", elapsed_time=" + elapsed_time);
101
102         throughput_s=expected_msgs / (elapsed_time/1000.0);
103         throughput_b=(expected_msgs * (msg_size/1000.0)) / (elapsed_time/1000.0);
104
105         String JavaDoc result="Received " + expected_msgs + " msgs. in " + elapsed_time + " msec.\n" +
106                 "Throughput: " + throughput_s + " [msgs/sec]\n" +
107                 "Throughput: " + throughput_b + " [KB/sec]\n" +
108                 "Total received: " + expected_msgs * (msg_size / 1000.0 / 1000.0) + " [MB]\n";
109         System.out.println(result);
110         if(log.isInfoEnabled()) log.info(result);
111
112         //String xmit_stats=NAKACK.dumpXmitStats();
113
//if(log.isInfoEnabled()) log.info("TRACE.special()", "stats:\n\n" + xmit_stats);
114

115         long sleep_time=10000;
116
117         System.out.println("-- sleeping for " + (sleep_time / 1000) + " seconds to allow for retransmissions");
118         try {
119             Thread.sleep(sleep_time);
120         }
121         catch(Throwable JavaDoc t) {
122         }
123
124         channel.close(); // Apparently does not work properly?
125
System.exit(0); // Radical alternative to channel.close().
126
}
127
128
129     String JavaDoc dumpStats(long received_msgs) {
130         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
131         if(gnuplot_output)
132             sb.append(received_msgs).append(' ');
133         else
134             sb.append("\nmsgs_received=").append(received_msgs);
135
136         if(gnuplot_output)
137             sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' ');
138         else
139             sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0);
140
141         if(gnuplot_output)
142             sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' ');
143         else
144             sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n');
145
146         dumpThroughput(sb, received_msgs);
147         return sb.toString();
148     }
149
150     void dumpThroughput(StringBuffer JavaDoc sb, long received_msgs) {
151         double tmp;
152         long current=System.currentTimeMillis();
153
154         if(last_dump == 0 || (current - last_dump) <= 0)
155             return;
156
157         tmp=(1000 * counter) / (current - beginning);
158         if(gnuplot_output)
159             sb.append(tmp).append(' ');
160         else
161             sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]");
162
163         tmp=(received_msgs * msg_size) / (current - beginning);
164         if(gnuplot_output)
165             sb.append(tmp).append(' ');
166         else
167             sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]");
168
169         tmp=(1000 * log_interval) / (current - last_dump);
170         if(gnuplot_output)
171             sb.append(tmp).append(' ');
172         else {
173             sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)=");
174             sb.append(tmp).append(" [msgs/sec]");
175         }
176
177         tmp=(log_interval * msg_size) / (current - last_dump);
178         if(gnuplot_output)
179             sb.append(tmp).append(' ');
180         else {
181             sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)=");
182             sb.append(tmp).append(" [KB/sec]\n");
183         }
184         last_dump=current;
185     }
186 }
187
Popular Tags