1 package org.jgroups.tests.adaptjms; 2 3 import org.apache.log4j.Logger; 4 5 import javax.jms.*; 6 import java.io.ByteArrayInputStream ; 7 import java.io.ObjectInputStream ; 8 import java.net.DatagramPacket ; 9 import java.util.ArrayList ; 10 import java.util.List ; 11 12 23 public class ReceiverThread implements MessageListener { 24 private int msg_size; 25 private int num_senders; 26 private long expected_msgs; 27 Logger log=Logger.getLogger(this.getClass()); 28 long counter=1; 29 long beginning=0, ending=0, elapsed_time, last_dump; 30 long log_interval=1000; 31 boolean gnuplot_output=Boolean.getBoolean("gnuplot_output"); 32 TopicSession session; 33 List receivers=new ArrayList (); 34 Object counter_mutex=new Object (); 35 boolean started=false; 36 37 double throughput_s, throughput_b; 38 boolean done=false; 39 Request req; 40 byte[] buf=new byte[300000]; 41 DatagramPacket p=new DatagramPacket (buf, buf.length); 42 ByteArrayInputStream input; 43 ObjectInputStream in; 44 45 46 public ReceiverThread(TopicSession session, Topic topic, int num_msgs, int ms, int ns, long log_interval) throws JMSException { 47 msg_size=ms; 48 num_senders=ns; 49 expected_msgs=num_msgs * num_senders; 50 this.log_interval=log_interval; 51 this.session=session; 52 TopicSubscriber sub=session.createSubscriber(topic); 53 sub.setMessageListener(this); 54 } 55 56 public void start() { 57 System.out.println("\nReceiver started...\n"); 58 counter=1; 59 beginning=0; 60 ending=0; 61 } 62 63 public void onMessage(Message message) { 64 if(done) 65 return; 66 67 if(message instanceof ObjectMessage) { 68 Request req=(Request)message; 69 if(req.type != Request.DATA) 70 return; 71 synchronized(counter_mutex) { 72 if(counter == 1 && !started) { 73 beginning=System.currentTimeMillis(); 74 last_dump=beginning; 75 started=true; 76 } 77 counter++; 78 if(counter % 100 == 0) { 79 System.out.println("-- received " + counter + " msgs"); 80 } 81 if(counter % log_interval == 0) { 82 if(log.isInfoEnabled()) log.info(dumpStats(counter)); 83 } 84 if(counter >= expected_msgs && !done) { 85 ending=System.currentTimeMillis(); 86 done=true; 87 } 88 } 89 90 if(counter >= expected_msgs) { 91 done=true; 92 if(gnuplot_output) { 93 StringBuffer sb=new StringBuffer (); 94 sb.append("\n##### msgs_received"); 95 sb.append(", free_mem [KB] "); 96 sb.append(", total_mem [KB] "); 97 sb.append(", total_msgs_sec [msgs/sec] "); 98 sb.append(", total_throughput [KB/sec] "); 99 sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) "); 100 sb.append(" [msgs/sec] "); 101 sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) "); 102 sb.append(" [KB/sec]\n"); 103 if(log.isInfoEnabled()) log.info(sb.toString()); 104 } 105 elapsed_time=(ending - beginning); 106 107 System.out.println("expected_msgs=" + expected_msgs + ", elapsed_time=" + elapsed_time); 108 109 throughput_s=expected_msgs / (elapsed_time/1000.0); 110 throughput_b=(expected_msgs * (msg_size/1000.0)) / (elapsed_time/1000.0); 111 112 String result="Received " + expected_msgs + " msgs. in " + elapsed_time + " msec.\n" + 113 "Throughput: " + throughput_s + " [msgs/sec]\n" + 114 "Throughput: " + throughput_b + " [KB/sec]\n" + 115 "Total received: " + expected_msgs * (msg_size / 1000.0 / 1000.0) + " [MB]\n"; 116 System.out.println(result); 117 if(log.isInfoEnabled()) log.info(result); 118 } 119 } 120 } 121 122 123 124 String dumpStats(long received_msgs) { 125 StringBuffer sb=new StringBuffer (); 126 if(gnuplot_output) 127 sb.append(received_msgs).append(' '); 128 else 129 sb.append("\nmsgs_received=").append(received_msgs); 130 131 if(gnuplot_output) 132 sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' '); 133 else 134 sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0); 135 136 if(gnuplot_output) 137 sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' '); 138 else 139 sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n'); 140 141 dumpThroughput(sb, received_msgs); 142 return sb.toString(); 143 } 144 145 void dumpThroughput(StringBuffer sb, long received_msgs) { 146 double tmp; 147 long current=System.currentTimeMillis(); 148 149 tmp=(1000 * counter) / (current - beginning); 150 if(gnuplot_output) 151 sb.append(tmp).append(' '); 152 else 153 sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]"); 154 155 tmp=(received_msgs * msg_size) / (current - beginning); 156 if(gnuplot_output) 157 sb.append(tmp).append(' '); 158 else 159 sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]"); 160 161 tmp=(1000 * log_interval) / (current - last_dump); 162 if(gnuplot_output) 163 sb.append(tmp).append(' '); 164 else { 165 sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)="); 166 sb.append(tmp).append(" [msgs/sec]"); 167 } 168 169 tmp=(log_interval * msg_size) / (current - last_dump); 170 if(gnuplot_output) 171 sb.append(tmp).append(' '); 172 else { 173 sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)="); 174 sb.append(tmp).append(" [KB/sec]\n"); 175 } 176 last_dump=current; 177 } 178 179 180 181 } 182 | Popular Tags |