1 package org.jgroups.tests.adapttcp; 2 3 import org.apache.log4j.Logger; 4 import org.jgroups.util.Util; 5 6 import java.io.BufferedOutputStream ; 7 import java.io.DataOutputStream ; 8 import java.net.InetAddress ; 9 import java.net.Socket ; 10 import java.util.Iterator ; 11 import java.util.List ; 12 13 20 public class SenderThread extends Thread { 21 private int num_msgs; 22 private int msg_size; 23 Logger log=Logger.getLogger(this.getClass()); 24 long log_interval=1000; 25 boolean gnuplot_output=Boolean.getBoolean("gnuplot_output"); 26 List nodes; 27 28 29 30 public SenderThread(List nodes, int num_msgs, int ms, long log_interval) { 31 this.num_msgs=num_msgs; 32 msg_size=ms; 33 this.log_interval=log_interval; 34 this.nodes=nodes; 35 } 36 37 public void run() { 38 long total_msgs=0; 39 ConnectionTable ct=null; 40 41 System.out.println("Sender thread started..."); 42 43 try { 44 ct=new ConnectionTable(nodes); 45 byte[] msg=new byte[msg_size]; 46 for(int h=0; h < msg_size; h++) { 47 msg[h]=(byte)h; 48 } 49 50 while(true) { 51 try { 52 ct.init(); 53 break; 54 } 55 catch(Exception ex) { 56 Util.sleep(1000); 57 } 58 } 59 System.out.println("Everyone joined, ready to begin test...\n" + 60 "cluster: " + ct.toString()); 61 62 for(int i=0; i < num_msgs; i++) { 63 ct.writeMessage(msg); 64 total_msgs++; 65 if(total_msgs % 1000 == 0) { 66 System.out.println("++ sent " + total_msgs); 67 } 68 if(total_msgs % log_interval == 0) { 69 if(gnuplot_output == false) 70 if(log.isInfoEnabled()) log.info(dumpStats(total_msgs)); 71 } 72 } 73 System.out.println("Sent all bursts. Sender terminates.\n"); 74 } 75 catch(Exception e) { 76 e.printStackTrace(); 77 } 78 finally { 79 if(ct != null) 80 ct.close(); 81 } 82 } 83 84 85 86 String dumpStats(long sent_msgs) { 87 StringBuffer sb=new StringBuffer (); 88 sb.append("\nmsgs_sent=").append(sent_msgs).append('\n'); 89 sb.append("free_mem=").append(Runtime.getRuntime().freeMemory()); 90 sb.append(" (total_mem=").append(Runtime.getRuntime().totalMemory()).append(")\n"); 91 return sb.toString(); 92 } 93 94 class ConnectionTable { 95 List nodes; 96 Connection[] connections; 97 98 ConnectionTable(List nodes) throws Exception { 99 this.nodes=nodes; 100 connections=new Connection[nodes.size()]; 101 } 102 103 104 void init() throws Exception { 105 int i=0; 106 107 for(Iterator it=nodes.iterator(); it.hasNext();) { 108 InetAddress addr=(InetAddress )it.next(); 109 if(connections[i] == null) { 110 connections[i]=new Connection(addr); 111 System.out.println("-- connected to " +addr); 112 System.out.flush(); 113 } 114 i++; 115 } 116 } 117 118 void writeMessage(byte[] msg) throws Exception { 120 for(int i=0; i < connections.length; i++) { 121 Connection c=connections[i]; 122 if(c != null) 123 c.writeMessage(msg); 124 } 125 } 126 127 void close() { 128 for(int i=0; i < connections.length; i++) { 129 Connection c=connections[i]; 130 if(c != null) 131 c.close(); 132 } 133 } 134 135 public String toString() { 136 StringBuffer sb=new StringBuffer (); 137 for(Iterator it=nodes.iterator(); it.hasNext();) { 138 InetAddress inetAddress=(InetAddress )it.next(); 139 sb.append(inetAddress).append(' '); 140 } 141 return sb.toString(); 142 } 143 } 144 145 class Connection { 146 Socket sock; 147 DataOutputStream out; 148 149 Connection(InetAddress addr) throws Exception { 150 sock=new Socket (addr, Test.srv_port); 151 out=new DataOutputStream (new BufferedOutputStream (sock.getOutputStream())); 152 } 153 154 void writeMessage(byte[] msg) throws Exception { 155 out.writeInt(msg.length); 156 out.write(msg, 0, msg.length); 157 out.flush(); 158 } 159 160 void close() { 161 try { 162 sock.close(); 163 } 164 catch(Exception ex) { 165 166 } 167 } 168 } 169 170 } 171 | Popular Tags |