KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > adaptudp > ReceiverThread


1 package org.jgroups.tests.adaptudp;
2
3 import org.apache.log4j.Logger;
4
5 import java.io.ByteArrayInputStream JavaDoc;
6 import java.io.ObjectInputStream JavaDoc;
7 import java.net.DatagramPacket JavaDoc;
8 import java.net.MulticastSocket JavaDoc;
9 import java.util.ArrayList JavaDoc;
10 import java.util.List JavaDoc;
11
12 /** Receiver thread: loops until it receives the expected
13  * number of messages. It measures the elapsed time between
14  * the reception of the first and the last message and
15  * calculates the throughputs. At the end, it closes the
16  * channel connection. (Actually the elapsed time refers
17  * to the reception of expected_msgs - 1, but that's not
18  * important.)
19  * @author Milcan Prica (prica@deei.units.it)
20  * @author Bela Ban (belaban@yahoo.com)
21
22  */

23 public class ReceiverThread extends Thread JavaDoc {
24     private int msg_size;
25     private int num_senders;
26     private long expected_msgs;
27     Logger log=Logger.getLogger(this.getClass());
28     long counter=1;
29     long beginning=0, ending=0, elapsed_time, last_dump;
30     long log_interval=1000;
31     boolean gnuplot_output=Boolean.getBoolean("gnuplot_output");
32     MulticastSocket JavaDoc recv_sock;
33     List JavaDoc receivers=new ArrayList JavaDoc();
34     Object JavaDoc counter_mutex=new Object JavaDoc();
35     boolean started=false;
36
37
38     public ReceiverThread(MulticastSocket JavaDoc recv_sock, int num_msgs, int ms, int ns, long log_interval) {
39         msg_size=ms;
40         num_senders=ns;
41         expected_msgs=num_msgs * num_senders;
42         this.log_interval=log_interval;
43         this.recv_sock=recv_sock;
44     }
45
46
47
48     public void run() {
49         double throughput_s, throughput_b;
50         System.out.println("\nReceiver thread started...\n");
51         counter=1;
52         beginning=0;
53         ending=0;
54         boolean done=false;
55         Request req;
56         byte[] buf=new byte[300000];
57         DatagramPacket JavaDoc p=new DatagramPacket JavaDoc(buf, buf.length);
58         ByteArrayInputStream JavaDoc input;
59         ObjectInputStream JavaDoc in;
60
61         while(recv_sock != null && counter < expected_msgs && !done) {
62             try {
63                 p.setData(buf);
64                 recv_sock.receive(p);
65                 input=new ByteArrayInputStream JavaDoc(p.getData(), 0, p.getLength());
66                 in=new ObjectInputStream JavaDoc(input);
67                 req=(Request)in.readObject();
68                 if(req.type != Request.DATA)
69                     continue;
70
71                 synchronized(counter_mutex) {
72                     if(counter == 1 && !started) {
73                         beginning=System.currentTimeMillis();
74                         last_dump=beginning;
75                         started=true;
76                     }
77                     counter++;
78                     if(counter % 100 == 0) {
79                         System.out.println("-- received " + counter + " msgs");
80                     }
81                     if(counter % log_interval == 0) {
82                         if(log.isInfoEnabled()) log.info(dumpStats(counter));
83                     }
84                     if(counter >= expected_msgs && !done) {
85                         ending=System.currentTimeMillis();
86                         done=true;
87                     }
88                 }
89             }
90             catch(Exception JavaDoc ex) {
91                 if(recv_sock == null) return;
92                 break;
93             }
94         }
95
96
97         if(gnuplot_output) {
98             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
99             sb.append("\n##### msgs_received");
100             sb.append(", free_mem [KB] ");
101             sb.append(", total_mem [KB] ");
102             sb.append(", total_msgs_sec [msgs/sec] ");
103             sb.append(", total_throughput [KB/sec] ");
104             sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) ");
105             sb.append(" [msgs/sec] ");
106             sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) ");
107             sb.append(" [KB/sec]\n");
108             if(log.isInfoEnabled()) log.info(sb.toString());
109         }
110
111
112
113
114         elapsed_time=(ending - beginning);
115
116         System.out.println("expected_msgs=" + expected_msgs + ", elapsed_time=" + elapsed_time);
117
118         throughput_s=expected_msgs / (elapsed_time/1000.0);
119         throughput_b=(expected_msgs * (msg_size/1000.0)) / (elapsed_time/1000.0);
120
121         String JavaDoc result="Received " + expected_msgs + " msgs. in " + elapsed_time + " msec.\n" +
122                 "Throughput: " + throughput_s + " [msgs/sec]\n" +
123                 "Throughput: " + throughput_b + " [KB/sec]\n" +
124                 "Total received: " + expected_msgs * (msg_size / 1000.0 / 1000.0) + " [MB]\n";
125         System.out.println(result);
126         if(log.isInfoEnabled()) log.info(result);
127     }
128
129
130     String JavaDoc dumpStats(long received_msgs) {
131         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
132         if(gnuplot_output)
133             sb.append(received_msgs).append(' ');
134         else
135             sb.append("\nmsgs_received=").append(received_msgs);
136
137         if(gnuplot_output)
138             sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' ');
139         else
140             sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0);
141
142         if(gnuplot_output)
143             sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' ');
144         else
145             sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n');
146
147         dumpThroughput(sb, received_msgs);
148         return sb.toString();
149     }
150
151     void dumpThroughput(StringBuffer JavaDoc sb, long received_msgs) {
152         double tmp;
153         long current=System.currentTimeMillis();
154
155         tmp=(1000 * counter) / (current - beginning);
156         if(gnuplot_output)
157             sb.append(tmp).append(' ');
158         else
159             sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]");
160
161         tmp=(received_msgs * msg_size) / (current - beginning);
162         if(gnuplot_output)
163             sb.append(tmp).append(' ');
164         else
165             sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]");
166
167         tmp=(1000 * log_interval) / (current - last_dump);
168         if(gnuplot_output)
169             sb.append(tmp).append(' ');
170         else {
171             sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)=");
172             sb.append(tmp).append(" [msgs/sec]");
173         }
174
175         tmp=(log_interval * msg_size) / (current - last_dump);
176         if(gnuplot_output)
177             sb.append(tmp).append(' ');
178         else {
179             sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)=");
180             sb.append(tmp).append(" [KB/sec]\n");
181         }
182         last_dump=current;
183     }
184
185 }
186
Popular Tags