1 package org.jgroups.tests.adapttcp; 2 3 import org.apache.log4j.Logger; 4 5 import java.io.BufferedInputStream ; 6 import java.io.DataInputStream ; 7 import java.net.ServerSocket ; 8 import java.net.Socket ; 9 import java.util.ArrayList ; 10 import java.util.List ; 11 12 23 public class ReceiverThread extends Thread { 24 private int msg_size; 25 private int num_senders; 26 private long expected_msgs; 27 Logger log=Logger.getLogger(this.getClass()); 28 long counter=1; 29 long beginning=0, ending=0, elapsed_time, last_dump; 30 long log_interval=1000; 31 boolean gnuplot_output=Boolean.getBoolean("gnuplot_output"); 32 ServerSocket srv_sock; 33 List receivers=new ArrayList (); 34 Object signal=new Object (); 35 Object counter_mutex=new Object (); 36 boolean done=false; 37 boolean started=false; 38 39 40 public ReceiverThread(ServerSocket srv_sock, int num_msgs, int ms, int ns, long log_interval) { 41 msg_size=ms; 42 num_senders=ns; 43 expected_msgs=num_msgs * num_senders; 44 this.log_interval=log_interval; 45 this.srv_sock=srv_sock; 46 } 47 48 49 50 public void run() { 51 double throughput_s, throughput_b; 52 System.out.println("\nReceiver thread started...\n"); 53 counter=1; 54 beginning=0; 55 ending=0; 56 57 58 59 60 61 if(gnuplot_output) { 62 StringBuffer sb=new StringBuffer (); 63 sb.append("\n##### msgs_received"); 64 sb.append(", free_mem [KB] "); 65 sb.append(", total_mem [KB] "); 66 sb.append(", total_msgs_sec [msgs/sec] "); 67 sb.append(", total_throughput [KB/sec] "); 68 sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) "); 69 sb.append(" [msgs/sec] "); 70 sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) "); 71 sb.append(" [KB/sec]\n"); 72 if(log.isInfoEnabled()) log.info(sb.toString()); 73 } 74 75 76 Thread acceptor=new Thread () { 78 public void run() { 79 while(true) { 80 try { 81 Socket s=srv_sock.accept(); 82 Receiver r=new Receiver(s); 83 r.setDaemon(true); 84 receivers.add(r); 85 r.start(); 86 } 87 catch(Exception ex) { 88 ex.printStackTrace(); 89 break; 90 } 91 } 92 } 93 }; 94 acceptor.setDaemon(true); 95 acceptor.start(); 96 97 synchronized(signal) { 99 while(!done) { 100 try { 101 signal.wait(); 102 } 103 catch(Exception ex) { 104 ; 105 } 106 } 107 } 108 109 elapsed_time=(ending - beginning); 110 111 System.out.println("expected_msgs=" + expected_msgs + ", elapsed_time=" + elapsed_time); 112 113 throughput_s=expected_msgs / (elapsed_time/1000.0); 114 throughput_b=(expected_msgs * (msg_size/1000.0)) / (elapsed_time/1000.0); 115 116 String result="Received " + expected_msgs + " msgs. in " + elapsed_time + " msec.\n" + 117 "Throughput: " + throughput_s + " [msgs/sec]\n" + 118 "Throughput: " + throughput_b + " [KB/sec]\n" + 119 "Total received: " + expected_msgs * (msg_size / 1000.0 / 1000.0) + " [MB]\n"; 120 System.out.println(result); 121 if(log.isInfoEnabled()) log.info(result); 122 } 123 124 125 String dumpStats(long received_msgs) { 126 StringBuffer sb=new StringBuffer (); 127 if(gnuplot_output) 128 sb.append(received_msgs).append(' '); 129 else 130 sb.append("\nmsgs_received=").append(received_msgs); 131 132 if(gnuplot_output) 133 sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' '); 134 else 135 sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0); 136 137 if(gnuplot_output) 138 sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' '); 139 else 140 sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n'); 141 142 dumpThroughput(sb, received_msgs); 143 return sb.toString(); 144 } 145 146 void dumpThroughput(StringBuffer sb, long received_msgs) { 147 double tmp; 148 long current=System.currentTimeMillis(); 149 150 tmp=(1000 * counter) / (current - beginning); 151 if(gnuplot_output) 152 sb.append(tmp).append(' '); 153 else 154 sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]"); 155 156 tmp=(received_msgs * msg_size) / (current - beginning); 157 if(gnuplot_output) 158 sb.append(tmp).append(' '); 159 else 160 sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]"); 161 162 tmp=(1000 * log_interval) / (current - last_dump); 163 if(gnuplot_output) 164 sb.append(tmp).append(' '); 165 else { 166 sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)="); 167 sb.append(tmp).append(" [msgs/sec]"); 168 } 169 170 tmp=(log_interval * msg_size) / (current - last_dump); 171 if(gnuplot_output) 172 sb.append(tmp).append(' '); 173 else { 174 sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)="); 175 sb.append(tmp).append(" [KB/sec]\n"); 176 } 177 last_dump=current; 178 } 179 180 181 void done() { 182 synchronized(signal) { 183 System.out.println("** notify()"); 184 signal.notifyAll(); 185 } 186 } 187 188 189 190 class Receiver extends Thread { 191 Socket sock; 192 DataInputStream in; 193 194 Receiver(Socket sock) throws Exception { 195 this.sock=sock; 196 sock.setSoTimeout(5000); 197 in=new DataInputStream (new BufferedInputStream (sock.getInputStream())); 198 } 199 200 public void run() { 201 while(sock != null && counter < expected_msgs) { 202 try { 203 readMessage(in); 204 205 synchronized(counter_mutex) { 206 if(counter == 1 && !started) { 207 beginning=System.currentTimeMillis(); 208 last_dump=beginning; 209 started=true; 210 } 211 counter++; 212 if(counter % 1000 == 0) { 213 System.out.println("-- received " + counter + " msgs"); 214 } 215 if(counter % log_interval == 0) { 216 if(log.isInfoEnabled()) log.info(dumpStats(counter)); 217 } 218 if(counter >= expected_msgs && !done) { 219 ending=System.currentTimeMillis(); 220 synchronized(signal) { 221 done=true; 222 signal.notifyAll(); 223 } 224 } 225 } 226 } 227 catch(Exception ex) { 228 if(sock == null) return; 229 break; 231 } 232 } 233 } 234 235 void stopThread() { 236 try { 237 sock.close(); 238 sock=null; 239 } 240 catch(Exception ex) { 241 242 } 243 } 244 245 246 void readMessage(DataInputStream in) throws Exception { 247 int len=in.readInt(); 248 byte[] buf=new byte[len]; 249 in.readFully(buf, 0, len); 250 } 251 } 252 } 253 | Popular Tags |