KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > adapttcp > ReceiverThread


1 package org.jgroups.tests.adapttcp;
2
3 import org.apache.log4j.Logger;
4
5 import java.io.BufferedInputStream JavaDoc;
6 import java.io.DataInputStream JavaDoc;
7 import java.net.ServerSocket JavaDoc;
8 import java.net.Socket JavaDoc;
9 import java.util.ArrayList JavaDoc;
10 import java.util.List JavaDoc;
11
12 /** Receiver thread: loops until it receives the expected
13  * number of messages. It measures the elapsed time between
14  * the reception of the first and the last message and
15  * calculates the throughputs. At the end, it closes the
16  * channel connection. (Actually the elapsed time refers
17  * to the reception of expected_msgs - 1, but that's not
18  * important.)
19  * @author Milcan Prica (prica@deei.units.it)
20  * @author Bela Ban (belaban@yahoo.com)
21
22  */

23 public class ReceiverThread extends Thread JavaDoc {
24     private int msg_size;
25     private int num_senders;
26     private long expected_msgs;
27     Logger log=Logger.getLogger(this.getClass());
28     long counter=1;
29     long beginning=0, ending=0, elapsed_time, last_dump;
30     long log_interval=1000;
31     boolean gnuplot_output=Boolean.getBoolean("gnuplot_output");
32     ServerSocket JavaDoc srv_sock;
33     List JavaDoc receivers=new ArrayList JavaDoc();
34     Object JavaDoc signal=new Object JavaDoc();
35     Object JavaDoc counter_mutex=new Object JavaDoc();
36     boolean done=false;
37     boolean started=false;
38
39
40     public ReceiverThread(ServerSocket JavaDoc srv_sock, int num_msgs, int ms, int ns, long log_interval) {
41         msg_size=ms;
42         num_senders=ns;
43         expected_msgs=num_msgs * num_senders;
44         this.log_interval=log_interval;
45         this.srv_sock=srv_sock;
46     }
47
48
49
50     public void run() {
51         double throughput_s, throughput_b;
52         System.out.println("\nReceiver thread started...\n");
53         counter=1;
54         beginning=0;
55         ending=0;
56
57
58
59
60
61         if(gnuplot_output) {
62             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
63             sb.append("\n##### msgs_received");
64             sb.append(", free_mem [KB] ");
65             sb.append(", total_mem [KB] ");
66             sb.append(", total_msgs_sec [msgs/sec] ");
67             sb.append(", total_throughput [KB/sec] ");
68             sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) ");
69             sb.append(" [msgs/sec] ");
70             sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) ");
71             sb.append(" [KB/sec]\n");
72             if(log.isInfoEnabled()) log.info(sb.toString());
73         }
74
75
76         // accept connections and start 1 Receiver per connection
77
Thread JavaDoc acceptor=new Thread JavaDoc() {
78             public void run() {
79                 while(true) {
80                     try {
81                         Socket JavaDoc s=srv_sock.accept();
82                         Receiver r=new Receiver(s);
83                         r.setDaemon(true);
84                         receivers.add(r);
85                         r.start();
86                     }
87                     catch(Exception JavaDoc ex) {
88                         ex.printStackTrace();
89                         break;
90                     }
91                 }
92             }
93         };
94         acceptor.setDaemon(true);
95         acceptor.start();
96
97         // wait for all messages
98
synchronized(signal) {
99             while(!done) {
100                 try {
101                     signal.wait();
102                 }
103                 catch(Exception JavaDoc ex) {
104                     ;
105                 }
106             }
107         }
108
109         elapsed_time=(ending - beginning);
110
111         System.out.println("expected_msgs=" + expected_msgs + ", elapsed_time=" + elapsed_time);
112
113         throughput_s=expected_msgs / (elapsed_time/1000.0);
114         throughput_b=(expected_msgs * (msg_size/1000.0)) / (elapsed_time/1000.0);
115
116         String JavaDoc result="Received " + expected_msgs + " msgs. in " + elapsed_time + " msec.\n" +
117                 "Throughput: " + throughput_s + " [msgs/sec]\n" +
118                 "Throughput: " + throughput_b + " [KB/sec]\n" +
119                 "Total received: " + expected_msgs * (msg_size / 1000.0 / 1000.0) + " [MB]\n";
120         System.out.println(result);
121         if(log.isInfoEnabled()) log.info(result);
122     }
123
124
125     String JavaDoc dumpStats(long received_msgs) {
126         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
127         if(gnuplot_output)
128             sb.append(received_msgs).append(' ');
129         else
130             sb.append("\nmsgs_received=").append(received_msgs);
131
132         if(gnuplot_output)
133             sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' ');
134         else
135             sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0);
136
137         if(gnuplot_output)
138             sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' ');
139         else
140             sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n');
141
142         dumpThroughput(sb, received_msgs);
143         return sb.toString();
144     }
145
146     void dumpThroughput(StringBuffer JavaDoc sb, long received_msgs) {
147         double tmp;
148         long current=System.currentTimeMillis();
149
150         tmp=(1000 * counter) / (current - beginning);
151         if(gnuplot_output)
152             sb.append(tmp).append(' ');
153         else
154             sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]");
155
156         tmp=(received_msgs * msg_size) / (current - beginning);
157         if(gnuplot_output)
158             sb.append(tmp).append(' ');
159         else
160             sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]");
161
162         tmp=(1000 * log_interval) / (current - last_dump);
163         if(gnuplot_output)
164             sb.append(tmp).append(' ');
165         else {
166             sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)=");
167             sb.append(tmp).append(" [msgs/sec]");
168         }
169
170         tmp=(log_interval * msg_size) / (current - last_dump);
171         if(gnuplot_output)
172             sb.append(tmp).append(' ');
173         else {
174             sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)=");
175             sb.append(tmp).append(" [KB/sec]\n");
176         }
177         last_dump=current;
178     }
179
180
181     void done() {
182         synchronized(signal) {
183             System.out.println("** notify()");
184             signal.notifyAll();
185         }
186     }
187
188
189
190     class Receiver extends Thread JavaDoc {
191         Socket JavaDoc sock;
192         DataInputStream JavaDoc in;
193
194         Receiver(Socket JavaDoc sock) throws Exception JavaDoc {
195             this.sock=sock;
196             sock.setSoTimeout(5000);
197             in=new DataInputStream JavaDoc(new BufferedInputStream JavaDoc(sock.getInputStream()));
198         }
199
200         public void run() {
201             while(sock != null && counter < expected_msgs) {
202                 try {
203                     readMessage(in);
204
205                     synchronized(counter_mutex) {
206                         if(counter == 1 && !started) {
207                             beginning=System.currentTimeMillis();
208                             last_dump=beginning;
209                             started=true;
210                         }
211                         counter++;
212                         if(counter % 1000 == 0) {
213                             System.out.println("-- received " + counter + " msgs");
214                         }
215                         if(counter % log_interval == 0) {
216                             if(log.isInfoEnabled()) log.info(dumpStats(counter));
217                         }
218                         if(counter >= expected_msgs && !done) {
219                             ending=System.currentTimeMillis();
220                             synchronized(signal) {
221                                 done=true;
222                                 signal.notifyAll();
223                             }
224                         }
225                     }
226                 }
227                 catch(Exception JavaDoc ex) {
228                     if(sock == null) return;
229                     // ex.printStackTrace();
230
break;
231                 }
232             }
233         }
234
235         void stopThread() {
236             try {
237                 sock.close();
238                 sock=null;
239             }
240             catch(Exception JavaDoc ex) {
241
242             }
243         }
244
245
246         void readMessage(DataInputStream JavaDoc in) throws Exception JavaDoc {
247             int len=in.readInt();
248             byte[] buf=new byte[len];
249             in.readFully(buf, 0, len);
250         }
251     }
252 }
253
Popular Tags