1 package org.jgroups.tests.adapt; 2 3 import org.apache.log4j.Logger; 4 import org.jgroups.*; 5 6 17 public class ReceiverThread extends Thread { 18 19 private Channel channel; 20 private int num_msgs; 21 private int msg_size; 22 private int num_senders; 23 Logger log=Logger.getLogger(this.getClass()); 24 long counter=1; 25 long expected_msgs=num_msgs * num_senders; 26 long beginning=0, ending=0, elapsed_time, last_dump; 27 long log_interval=1000; 28 boolean gnuplot_output=Boolean.getBoolean("gnuplot_output"); 29 30 31 public ReceiverThread(Channel ch, int num_msgs, int msg_size, int ns, long log_interval) { 32 channel=ch; 33 this.num_msgs=num_msgs; 34 this.msg_size=msg_size; 35 num_senders=ns; 36 expected_msgs=num_msgs * num_senders; 37 this.log_interval=log_interval; 38 } 39 40 public void run() { 41 double throughput_s, throughput_b; 42 System.out.println("\nReceiver thread started...\n"); 43 counter=1; 44 beginning=0; 45 ending=0; 46 47 if(gnuplot_output) { 48 StringBuffer sb=new StringBuffer (); 49 sb.append("\n##### msgs_received"); 50 sb.append(", free_mem [KB] "); 51 sb.append(", total_mem [KB] "); 52 sb.append(", total_msgs_sec [msgs/sec] "); 53 sb.append(", total_throughput [KB/sec] "); 54 sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) "); 55 sb.append(" [msgs/sec] "); 56 sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) "); 57 sb.append(" [KB/sec]\n"); 58 if(log.isInfoEnabled()) log.info(sb.toString()); 59 } 60 61 62 while(counter <= expected_msgs) { 63 try { 64 Object received=channel.receive(60000); 65 if(received instanceof Message) { 66 if(counter == 1) { 67 beginning=System.currentTimeMillis(); 68 last_dump=beginning; 69 } 70 if(counter == expected_msgs) 71 ending=System.currentTimeMillis(); 72 counter++; 73 if(counter % 1000 == 0) { 74 System.out.println("-- received " + counter + " msgs"); 75 } 76 if(counter % log_interval == 0) { 77 if(log.isInfoEnabled()) log.info(dumpStats(counter)); 78 } 79 } 80 } 81 catch(ClassCastException e) { 82 continue; 83 } 84 catch(ChannelNotConnectedException e) { 85 e.printStackTrace(); 86 } 87 catch(ChannelClosedException e) { 88 e.printStackTrace(); 89 } 90 catch(TimeoutException e) { 91 ending=System.currentTimeMillis(); 92 System.out.println("Received " + counter + " / " + 93 expected_msgs + " messages"); 94 expected_msgs=counter; 95 break; 96 } 97 } 98 elapsed_time=(ending - beginning); 99 100 System.out.println("expected_msgs=" + expected_msgs + ", elapsed_time=" + elapsed_time); 101 102 throughput_s=expected_msgs / (elapsed_time/1000.0); 103 throughput_b=(expected_msgs * (msg_size/1000.0)) / (elapsed_time/1000.0); 104 105 String result="Received " + expected_msgs + " msgs. in " + elapsed_time + " msec.\n" + 106 "Throughput: " + throughput_s + " [msgs/sec]\n" + 107 "Throughput: " + throughput_b + " [KB/sec]\n" + 108 "Total received: " + expected_msgs * (msg_size / 1000.0 / 1000.0) + " [MB]\n"; 109 System.out.println(result); 110 if(log.isInfoEnabled()) log.info(result); 111 112 115 long sleep_time=10000; 116 117 System.out.println("-- sleeping for " + (sleep_time / 1000) + " seconds to allow for retransmissions"); 118 try { 119 Thread.sleep(sleep_time); 120 } 121 catch(Throwable t) { 122 } 123 124 channel.close(); System.exit(0); } 127 128 129 String dumpStats(long received_msgs) { 130 StringBuffer sb=new StringBuffer (); 131 if(gnuplot_output) 132 sb.append(received_msgs).append(' '); 133 else 134 sb.append("\nmsgs_received=").append(received_msgs); 135 136 if(gnuplot_output) 137 sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' '); 138 else 139 sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0); 140 141 if(gnuplot_output) 142 sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' '); 143 else 144 sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n'); 145 146 dumpThroughput(sb, received_msgs); 147 return sb.toString(); 148 } 149 150 void dumpThroughput(StringBuffer sb, long received_msgs) { 151 double tmp; 152 long current=System.currentTimeMillis(); 153 154 if(last_dump == 0 || (current - last_dump) <= 0) 155 return; 156 157 tmp=(1000 * counter) / (current - beginning); 158 if(gnuplot_output) 159 sb.append(tmp).append(' '); 160 else 161 sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]"); 162 163 tmp=(received_msgs * msg_size) / (current - beginning); 164 if(gnuplot_output) 165 sb.append(tmp).append(' '); 166 else 167 sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]"); 168 169 tmp=(1000 * log_interval) / (current - last_dump); 170 if(gnuplot_output) 171 sb.append(tmp).append(' '); 172 else { 173 sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)="); 174 sb.append(tmp).append(" [msgs/sec]"); 175 } 176 177 tmp=(log_interval * msg_size) / (current - last_dump); 178 if(gnuplot_output) 179 sb.append(tmp).append(' '); 180 else { 181 sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)="); 182 sb.append(tmp).append(" [KB/sec]\n"); 183 } 184 last_dump=current; 185 } 186 } 187 | Popular Tags |