1 package org.jgroups.tests.adaptudp; 2 3 import org.apache.log4j.Logger; 4 5 import java.io.ByteArrayInputStream ; 6 import java.io.ObjectInputStream ; 7 import java.net.DatagramPacket ; 8 import java.net.MulticastSocket ; 9 import java.util.ArrayList ; 10 import java.util.List ; 11 12 23 public class ReceiverThread extends Thread { 24 private int msg_size; 25 private int num_senders; 26 private long expected_msgs; 27 Logger log=Logger.getLogger(this.getClass()); 28 long counter=1; 29 long beginning=0, ending=0, elapsed_time, last_dump; 30 long log_interval=1000; 31 boolean gnuplot_output=Boolean.getBoolean("gnuplot_output"); 32 MulticastSocket recv_sock; 33 List receivers=new ArrayList (); 34 Object counter_mutex=new Object (); 35 boolean started=false; 36 37 38 public ReceiverThread(MulticastSocket recv_sock, int num_msgs, int ms, int ns, long log_interval) { 39 msg_size=ms; 40 num_senders=ns; 41 expected_msgs=num_msgs * num_senders; 42 this.log_interval=log_interval; 43 this.recv_sock=recv_sock; 44 } 45 46 47 48 public void run() { 49 double throughput_s, throughput_b; 50 System.out.println("\nReceiver thread started...\n"); 51 counter=1; 52 beginning=0; 53 ending=0; 54 boolean done=false; 55 Request req; 56 byte[] buf=new byte[300000]; 57 DatagramPacket p=new DatagramPacket (buf, buf.length); 58 ByteArrayInputStream input; 59 ObjectInputStream in; 60 61 while(recv_sock != null && counter < expected_msgs && !done) { 62 try { 63 p.setData(buf); 64 recv_sock.receive(p); 65 input=new ByteArrayInputStream (p.getData(), 0, p.getLength()); 66 in=new ObjectInputStream (input); 67 req=(Request)in.readObject(); 68 if(req.type != Request.DATA) 69 continue; 70 71 synchronized(counter_mutex) { 72 if(counter == 1 && !started) { 73 beginning=System.currentTimeMillis(); 74 last_dump=beginning; 75 started=true; 76 } 77 counter++; 78 if(counter % 100 == 0) { 79 System.out.println("-- received " + counter + " msgs"); 80 } 81 if(counter % log_interval == 0) { 82 if(log.isInfoEnabled()) log.info(dumpStats(counter)); 83 } 84 if(counter >= expected_msgs && !done) { 85 ending=System.currentTimeMillis(); 86 done=true; 87 } 88 } 89 } 90 catch(Exception ex) { 91 if(recv_sock == null) return; 92 break; 93 } 94 } 95 96 97 if(gnuplot_output) { 98 StringBuffer sb=new StringBuffer (); 99 sb.append("\n##### msgs_received"); 100 sb.append(", free_mem [KB] "); 101 sb.append(", total_mem [KB] "); 102 sb.append(", total_msgs_sec [msgs/sec] "); 103 sb.append(", total_throughput [KB/sec] "); 104 sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) "); 105 sb.append(" [msgs/sec] "); 106 sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) "); 107 sb.append(" [KB/sec]\n"); 108 if(log.isInfoEnabled()) log.info(sb.toString()); 109 } 110 111 112 113 114 elapsed_time=(ending - beginning); 115 116 System.out.println("expected_msgs=" + expected_msgs + ", elapsed_time=" + elapsed_time); 117 118 throughput_s=expected_msgs / (elapsed_time/1000.0); 119 throughput_b=(expected_msgs * (msg_size/1000.0)) / (elapsed_time/1000.0); 120 121 String result="Received " + expected_msgs + " msgs. in " + elapsed_time + " msec.\n" + 122 "Throughput: " + throughput_s + " [msgs/sec]\n" + 123 "Throughput: " + throughput_b + " [KB/sec]\n" + 124 "Total received: " + expected_msgs * (msg_size / 1000.0 / 1000.0) + " [MB]\n"; 125 System.out.println(result); 126 if(log.isInfoEnabled()) log.info(result); 127 } 128 129 130 String dumpStats(long received_msgs) { 131 StringBuffer sb=new StringBuffer (); 132 if(gnuplot_output) 133 sb.append(received_msgs).append(' '); 134 else 135 sb.append("\nmsgs_received=").append(received_msgs); 136 137 if(gnuplot_output) 138 sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' '); 139 else 140 sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0); 141 142 if(gnuplot_output) 143 sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' '); 144 else 145 sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n'); 146 147 dumpThroughput(sb, received_msgs); 148 return sb.toString(); 149 } 150 151 void dumpThroughput(StringBuffer sb, long received_msgs) { 152 double tmp; 153 long current=System.currentTimeMillis(); 154 155 tmp=(1000 * counter) / (current - beginning); 156 if(gnuplot_output) 157 sb.append(tmp).append(' '); 158 else 159 sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]"); 160 161 tmp=(received_msgs * msg_size) / (current - beginning); 162 if(gnuplot_output) 163 sb.append(tmp).append(' '); 164 else 165 sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]"); 166 167 tmp=(1000 * log_interval) / (current - last_dump); 168 if(gnuplot_output) 169 sb.append(tmp).append(' '); 170 else { 171 sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)="); 172 sb.append(tmp).append(" [msgs/sec]"); 173 } 174 175 tmp=(log_interval * msg_size) / (current - last_dump); 176 if(gnuplot_output) 177 sb.append(tmp).append(' '); 178 else { 179 sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)="); 180 sb.append(tmp).append(" [KB/sec]\n"); 181 } 182 last_dump=current; 183 } 184 185 } 186 | Popular Tags |