1 package org.jgroups.tests.adaptjms; 2 3 import org.apache.log4j.Logger; 4 5 import javax.jms.ObjectMessage ; 6 import javax.jms.Topic ; 7 import javax.jms.TopicPublisher ; 8 import javax.jms.TopicSession ; 9 import java.util.List ; 10 11 18 public class SenderThread extends Thread { 19 private int num_msgs; 20 private int msg_size; 21 Logger log=Logger.getLogger(this.getClass()); 22 long log_interval=1000; 23 boolean gnuplot_output=Boolean.getBoolean("gnuplot_output"); 24 List nodes; 25 TopicPublisher pub; 26 TopicSession session; 27 Topic topic; 28 29 30 public SenderThread(TopicSession session, TopicPublisher pub, Topic topic, int num_msgs, int ms, long log_interval) { 31 this.num_msgs=num_msgs; 32 msg_size=ms; 33 this.log_interval=log_interval; 34 this.session=session; 35 this.pub=pub; 36 this.topic=topic; 37 } 38 39 public void run() { 40 long total_msgs=0; 41 Request req; 42 ObjectMessage msg; 43 44 System.out.println("Sender thread started..."); 45 46 try { 47 byte[] m=new byte[msg_size]; 48 for(int h=0; h < msg_size; h++) { 49 m[h]=(byte)h; 50 } 51 52 System.out.println("Everyone joined, ready to begin test...\n"); 53 54 for(int i=0; i < num_msgs; i++) { 55 req=new Request(Request.DATA, m); 56 msg=session.createObjectMessage(req); 57 pub.publish(topic, msg); 58 59 total_msgs++; 60 if(total_msgs % 1000 == 0) { 61 System.out.println("++ sent " + total_msgs); 62 } 63 if(total_msgs % log_interval == 0) { 64 if(gnuplot_output == false) 65 if(log.isInfoEnabled()) log.info(dumpStats(total_msgs)); 66 } 67 } 68 System.out.println("Sent all bursts. Sender terminates.\n"); 69 } 70 catch(Exception e) { 71 e.printStackTrace(); 72 } 73 finally { 74 } 75 } 76 77 78 79 String dumpStats(long sent_msgs) { 80 StringBuffer sb=new StringBuffer (); 81 sb.append("\nmsgs_sent=").append(sent_msgs).append('\n'); 82 sb.append("free_mem=").append(Runtime.getRuntime().freeMemory()); 83 sb.append(" (total_mem=").append(Runtime.getRuntime().totalMemory()).append(")\n"); 84 return sb.toString(); 85 } 86 87 } 88 | Popular Tags |