KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > adaptjms > ReceiverThread


1 package org.jgroups.tests.adaptjms;
2
3 import org.apache.log4j.Logger;
4
5 import javax.jms.*;
6 import java.io.ByteArrayInputStream JavaDoc;
7 import java.io.ObjectInputStream JavaDoc;
8 import java.net.DatagramPacket JavaDoc;
9 import java.util.ArrayList JavaDoc;
10 import java.util.List JavaDoc;
11
12 /** Receiver thread: loops until it receives the expected
13  * number of messages. It measures the elapsed time between
14  * the reception of the first and the last message and
15  * calculates the throughputs. At the end, it closes the
16  * channel connection. (Actually the elapsed time refers
17  * to the reception of expected_msgs - 1, but that's not
18  * important.)
19  * @author Milcan Prica (prica@deei.units.it)
20  * @author Bela Ban (belaban@yahoo.com)
21
22  */

23 public class ReceiverThread implements MessageListener {
24     private int msg_size;
25     private int num_senders;
26     private long expected_msgs;
27     Logger log=Logger.getLogger(this.getClass());
28     long counter=1;
29     long beginning=0, ending=0, elapsed_time, last_dump;
30     long log_interval=1000;
31     boolean gnuplot_output=Boolean.getBoolean("gnuplot_output");
32     TopicSession session;
33     List JavaDoc receivers=new ArrayList JavaDoc();
34     Object JavaDoc counter_mutex=new Object JavaDoc();
35     boolean started=false;
36
37     double throughput_s, throughput_b;
38     boolean done=false;
39     Request req;
40     byte[] buf=new byte[300000];
41     DatagramPacket JavaDoc p=new DatagramPacket JavaDoc(buf, buf.length);
42     ByteArrayInputStream JavaDoc input;
43     ObjectInputStream JavaDoc in;
44
45
46     public ReceiverThread(TopicSession session, Topic topic, int num_msgs, int ms, int ns, long log_interval) throws JMSException {
47         msg_size=ms;
48         num_senders=ns;
49         expected_msgs=num_msgs * num_senders;
50         this.log_interval=log_interval;
51         this.session=session;
52         TopicSubscriber sub=session.createSubscriber(topic);
53         sub.setMessageListener(this);
54     }
55
56     public void start() {
57         System.out.println("\nReceiver started...\n");
58         counter=1;
59         beginning=0;
60         ending=0;
61     }
62
63     public void onMessage(Message message) {
64         if(done)
65             return;
66
67         if(message instanceof ObjectMessage) {
68             Request req=(Request)message;
69             if(req.type != Request.DATA)
70                 return;
71             synchronized(counter_mutex) {
72                 if(counter == 1 && !started) {
73                     beginning=System.currentTimeMillis();
74                     last_dump=beginning;
75                     started=true;
76                 }
77                 counter++;
78                 if(counter % 100 == 0) {
79                     System.out.println("-- received " + counter + " msgs");
80                 }
81                 if(counter % log_interval == 0) {
82                     if(log.isInfoEnabled()) log.info(dumpStats(counter));
83                 }
84                 if(counter >= expected_msgs && !done) {
85                     ending=System.currentTimeMillis();
86                     done=true;
87                 }
88             }
89
90             if(counter >= expected_msgs) {
91                 done=true;
92                 if(gnuplot_output) {
93                     StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
94                     sb.append("\n##### msgs_received");
95                     sb.append(", free_mem [KB] ");
96                     sb.append(", total_mem [KB] ");
97                     sb.append(", total_msgs_sec [msgs/sec] ");
98                     sb.append(", total_throughput [KB/sec] ");
99                     sb.append(", rolling_msgs_sec (last ").append(log_interval).append(" msgs) ");
100                     sb.append(" [msgs/sec] ");
101                     sb.append(", rolling_throughput (last ").append(log_interval).append(" msgs) ");
102                     sb.append(" [KB/sec]\n");
103                     if(log.isInfoEnabled()) log.info(sb.toString());
104                 }
105                 elapsed_time=(ending - beginning);
106
107                 System.out.println("expected_msgs=" + expected_msgs + ", elapsed_time=" + elapsed_time);
108
109                 throughput_s=expected_msgs / (elapsed_time/1000.0);
110                 throughput_b=(expected_msgs * (msg_size/1000.0)) / (elapsed_time/1000.0);
111
112                 String JavaDoc result="Received " + expected_msgs + " msgs. in " + elapsed_time + " msec.\n" +
113                         "Throughput: " + throughput_s + " [msgs/sec]\n" +
114                         "Throughput: " + throughput_b + " [KB/sec]\n" +
115                         "Total received: " + expected_msgs * (msg_size / 1000.0 / 1000.0) + " [MB]\n";
116                 System.out.println(result);
117                 if(log.isInfoEnabled()) log.info(result);
118             }
119         }
120     }
121
122
123
124     String JavaDoc dumpStats(long received_msgs) {
125         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
126         if(gnuplot_output)
127             sb.append(received_msgs).append(' ');
128         else
129             sb.append("\nmsgs_received=").append(received_msgs);
130
131         if(gnuplot_output)
132             sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' ');
133         else
134             sb.append(", free_mem=").append(Runtime.getRuntime().freeMemory() / 1000.0);
135
136         if(gnuplot_output)
137             sb.append(Runtime.getRuntime().totalMemory() / 1000.0).append(' ');
138         else
139             sb.append(", total_mem=").append(Runtime.getRuntime().totalMemory() / 1000.0).append('\n');
140
141         dumpThroughput(sb, received_msgs);
142         return sb.toString();
143     }
144
145     void dumpThroughput(StringBuffer JavaDoc sb, long received_msgs) {
146         double tmp;
147         long current=System.currentTimeMillis();
148
149         tmp=(1000 * counter) / (current - beginning);
150         if(gnuplot_output)
151             sb.append(tmp).append(' ');
152         else
153             sb.append("total_msgs_sec=").append(tmp).append(" [msgs/sec]");
154
155         tmp=(received_msgs * msg_size) / (current - beginning);
156         if(gnuplot_output)
157             sb.append(tmp).append(' ');
158         else
159             sb.append("\ntotal_throughput=").append(tmp).append(" [KB/sec]");
160
161         tmp=(1000 * log_interval) / (current - last_dump);
162         if(gnuplot_output)
163             sb.append(tmp).append(' ');
164         else {
165             sb.append("\nrolling_msgs_sec (last ").append(log_interval).append(" msgs)=");
166             sb.append(tmp).append(" [msgs/sec]");
167         }
168
169         tmp=(log_interval * msg_size) / (current - last_dump);
170         if(gnuplot_output)
171             sb.append(tmp).append(' ');
172         else {
173             sb.append("\nrolling_throughput (last ").append(log_interval).append(" msgs)=");
174             sb.append(tmp).append(" [KB/sec]\n");
175         }
176         last_dump=current;
177     }
178
179
180
181 }
182
Popular Tags