KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > adapttcp > SenderThread


1 package org.jgroups.tests.adapttcp;
2
3 import org.apache.log4j.Logger;
4 import org.jgroups.util.Util;
5
6 import java.io.BufferedOutputStream JavaDoc;
7 import java.io.DataOutputStream JavaDoc;
8 import java.net.InetAddress JavaDoc;
9 import java.net.Socket JavaDoc;
10 import java.util.Iterator JavaDoc;
11 import java.util.List JavaDoc;
12
13 /** Sender thread: inputs into the system a num_busts bursts
14  * of msgs_burst messages composed of msg_size bytes.
15  * Sleeps for sleep_msec after each burst.
16  * @author Milcan Prica (prica@deei.units.it)
17  * @author Bela Ban (belaban@yahoo.com)
18
19  */

20 public class SenderThread extends Thread JavaDoc {
21     private int num_msgs;
22     private int msg_size;
23     Logger log=Logger.getLogger(this.getClass());
24     long log_interval=1000;
25     boolean gnuplot_output=Boolean.getBoolean("gnuplot_output");
26     List JavaDoc nodes;
27
28
29
30     public SenderThread(List JavaDoc nodes, int num_msgs, int ms, long log_interval) {
31         this.num_msgs=num_msgs;
32         msg_size=ms;
33         this.log_interval=log_interval;
34         this.nodes=nodes;
35     }
36
37     public void run() {
38         long total_msgs=0;
39         ConnectionTable ct=null;
40
41         System.out.println("Sender thread started...");
42
43         try {
44             ct=new ConnectionTable(nodes);
45             byte[] msg=new byte[msg_size];
46             for(int h=0; h < msg_size; h++) {
47                 msg[h]=(byte)h;
48             }
49
50             while(true) {
51                 try {
52                     ct.init();
53                     break;
54                 }
55                 catch(Exception JavaDoc ex) {
56                     Util.sleep(1000);
57                 }
58             }
59             System.out.println("Everyone joined, ready to begin test...\n" +
60                     "cluster: " + ct.toString());
61
62             for(int i=0; i < num_msgs; i++) {
63                 ct.writeMessage(msg);
64                 total_msgs++;
65                 if(total_msgs % 1000 == 0) {
66                     System.out.println("++ sent " + total_msgs);
67                 }
68                 if(total_msgs % log_interval == 0) {
69                     if(gnuplot_output == false)
70                         if(log.isInfoEnabled()) log.info(dumpStats(total_msgs));
71                 }
72             }
73             System.out.println("Sent all bursts. Sender terminates.\n");
74         }
75         catch(Exception JavaDoc e) {
76             e.printStackTrace();
77         }
78         finally {
79             if(ct != null)
80                 ct.close();
81         }
82     }
83
84
85
86     String JavaDoc dumpStats(long sent_msgs) {
87         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
88         sb.append("\nmsgs_sent=").append(sent_msgs).append('\n');
89         sb.append("free_mem=").append(Runtime.getRuntime().freeMemory());
90         sb.append(" (total_mem=").append(Runtime.getRuntime().totalMemory()).append(")\n");
91         return sb.toString();
92     }
93
94     class ConnectionTable {
95         List JavaDoc nodes;
96         Connection[] connections;
97
98         ConnectionTable(List JavaDoc nodes) throws Exception JavaDoc {
99             this.nodes=nodes;
100             connections=new Connection[nodes.size()];
101         }
102
103
104         void init() throws Exception JavaDoc {
105             int i=0;
106
107             for(Iterator JavaDoc it=nodes.iterator(); it.hasNext();) {
108                 InetAddress JavaDoc addr=(InetAddress JavaDoc)it.next();
109                 if(connections[i] == null) {
110                     connections[i]=new Connection(addr);
111                     System.out.println("-- connected to " +addr);
112                     System.out.flush();
113                 }
114                 i++;
115             }
116         }
117
118         // todo: parallelize
119
void writeMessage(byte[] msg) throws Exception JavaDoc {
120             for(int i=0; i < connections.length; i++) {
121                 Connection c=connections[i];
122                 if(c != null)
123                     c.writeMessage(msg);
124             }
125         }
126
127         void close() {
128             for(int i=0; i < connections.length; i++) {
129                 Connection c=connections[i];
130                 if(c != null)
131                     c.close();
132             }
133         }
134
135         public String JavaDoc toString() {
136             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
137             for(Iterator JavaDoc it=nodes.iterator(); it.hasNext();) {
138                 InetAddress JavaDoc inetAddress=(InetAddress JavaDoc)it.next();
139                 sb.append(inetAddress).append(' ');
140             }
141             return sb.toString();
142         }
143     }
144
145     class Connection {
146         Socket JavaDoc sock;
147         DataOutputStream JavaDoc out;
148
149         Connection(InetAddress JavaDoc addr) throws Exception JavaDoc {
150             sock=new Socket JavaDoc(addr, Test.srv_port);
151             out=new DataOutputStream JavaDoc(new BufferedOutputStream JavaDoc(sock.getOutputStream()));
152         }
153
154         void writeMessage(byte[] msg) throws Exception JavaDoc {
155             out.writeInt(msg.length);
156             out.write(msg, 0, msg.length);
157             out.flush();
158         }
159
160         void close() {
161             try {
162                 sock.close();
163             }
164             catch(Exception JavaDoc ex) {
165
166             }
167         }
168     }
169
170 }
171
Popular Tags