KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > MultiplexerStressTest


1 package org.jgroups.tests;
2
3 import org.jgroups.*;
4 import org.jgroups.util.Util;
5
6 import java.util.concurrent.CyclicBarrier JavaDoc;
7 import java.util.concurrent.atomic.AtomicLong JavaDoc;
8
9 /**
10  * Simple and experimental performance program for the Multiplexer. Runs 6 MuxChannels (2 clusters) inside the
11  * same VM. Not optimal, but the main reason was to see whether the Multiplexer falls apart when we stress test it
12  * (seems not to be the case).
13  * @author Bela Ban
14  * @version $Id: MultiplexerStressTest.java,v 1.1 2007/03/05 16:21:22 belaban Exp $
15  */

16 public class MultiplexerStressTest {
17     Channel c11, c12, c21, c22, c31, c32;
18     ChannelFactory f1, f2, f3;
19     private MyReceiver r11, r12, r21, r22, r31, r32;
20
21     static final int NUM_MSGS=100000;
22     static final int SIZE=1000;
23
24
25     public MultiplexerStressTest() {
26
27     }
28
29     private void start() throws Exception JavaDoc {
30         CyclicBarrier JavaDoc barrier=new CyclicBarrier JavaDoc(7);
31         f1=new JChannelFactory();
32         f2=new JChannelFactory();
33         f3=new JChannelFactory();
34
35         f1.setMultiplexerConfig("stacks.xml");
36         f2.setMultiplexerConfig("stacks.xml");
37         f3.setMultiplexerConfig("stacks.xml");
38
39         c11=f1.createMultiplexerChannel("udp", "A");
40         c11.connect("X");
41         r11=new MyReceiver(barrier);
42         c11.setReceiver(r11);
43
44         c12=f1.createMultiplexerChannel("udp", "B");
45         c12.connect("X");
46         r12=new MyReceiver(barrier);
47         c12.setReceiver(r12);
48
49         c21=f2.createMultiplexerChannel("udp", "A");
50         c21.connect("X");
51         r21=new MyReceiver(barrier);
52         c21.setReceiver(r21);
53
54         c22=f2.createMultiplexerChannel("udp", "B");
55         c22.connect("X");
56         r22=new MyReceiver(barrier);
57         c22.setReceiver(r22);
58
59         c31=f3.createMultiplexerChannel("udp", "A");
60         c31.connect("X");
61         r31=new MyReceiver(barrier);
62         c31.setReceiver(r31);
63
64         c32=f3.createMultiplexerChannel("udp", "B");
65         c32.connect("X");
66         r32=new MyReceiver(barrier);
67         c32.setReceiver(r32);
68
69         long start, stop;
70
71         new MySender(barrier, c11).start();
72         new MySender(barrier, c12).start();
73         new MySender(barrier, c21).start();
74         new MySender(barrier, c22).start();
75         new MySender(barrier, c31).start();
76         new MySender(barrier, c32).start();
77
78         barrier.await(); // start the 6 sender threads
79
start=System.currentTimeMillis();
80
81         barrier.await(); // results from the 6 receivers
82
stop=System.currentTimeMillis();
83
84         System.out.println("Cluster A:\n" + printStats(stop-start,new MyReceiver[]{r11,r21,r31}));
85         System.out.println("Cluster B:\n" + printStats(stop-start,new MyReceiver[]{r12,r22,r32}));
86
87         c32.close();
88         c31.close();
89         c21.close();
90         c22.close();
91         c12.close();
92         c11.close();
93     }
94
95     private String JavaDoc printStats(long total_time, MyReceiver[] cluster) {
96         int num_msgs=0;
97         long num_bytes=0;
98         int cluster_size=cluster.length;
99
100         for(int i=0; i < cluster_size; i++) {
101             num_msgs+=cluster[i].getNumMessages();
102             num_bytes+=cluster[i].getNumBytes();
103         }
104
105         double msgs_per_sec=num_msgs / (total_time / 1000.00);
106         double bytes_per_sec=num_bytes * SIZE / (total_time / 1000.00);
107
108         StringBuilder JavaDoc sb=new StringBuilder JavaDoc();
109         sb.append("total msgs=").append(num_msgs).append(", msg rate=").append(msgs_per_sec);
110         sb.append(", total time=").append(total_time / 1000.00);
111         sb.append(", throughput=").append(Util.printBytes(bytes_per_sec));
112         return sb.toString();
113     }
114
115
116     private static class MyReceiver extends ReceiverAdapter {
117         AtomicLong JavaDoc received_msgs=new AtomicLong JavaDoc(0);
118         AtomicLong JavaDoc received_bytes=new AtomicLong JavaDoc(0);
119         CyclicBarrier JavaDoc barrier;
120         int print=NUM_MSGS / 10;
121
122
123         public MyReceiver(CyclicBarrier JavaDoc barrier) {
124             this.barrier=barrier;
125         }
126
127         public long getNumMessages() {
128             return received_msgs.get();
129         }
130
131         public long getNumBytes() {
132             return received_bytes.get();
133         }
134
135
136         public void receive(Message msg) {
137             int length=msg.getLength();
138             if(length > 0) {
139                 received_msgs.incrementAndGet();
140                 received_bytes.addAndGet(length);
141
142                 if(received_msgs.get() % print == 0)
143                     System.out.println("received " + received_msgs.get() + " msgs");
144
145                 if(received_msgs.get() >= NUM_MSGS) {
146                     try {
147                         barrier.await();
148                     }
149                     catch(Exception JavaDoc e) {
150                         e.printStackTrace();
151                     }
152                 }
153             }
154         }
155     }
156
157
158
159     private static class MySender extends Thread JavaDoc {
160         CyclicBarrier JavaDoc barrier;
161         Channel ch;
162
163
164         public MySender(CyclicBarrier JavaDoc barrier, Channel ch) {
165             this.barrier=barrier;
166             this.ch=ch;
167         }
168
169         public void run() {
170             byte[] buf=new byte[SIZE];
171             Message msg;
172             int print=NUM_MSGS / 10;
173             try {
174                 barrier.await();
175                 for(int i=1; i <= NUM_MSGS; i++) {
176                     msg=new Message(null, null, buf, 0, buf.length);
177                     ch.send(msg);
178                     if(i % print == 0)
179                         System.out.println(getName() + ": sent " + i + " msgs");
180                 }
181             }
182             catch(Exception JavaDoc e) {
183                 e.printStackTrace();
184             }
185         }
186     }
187
188
189     public static void main(String JavaDoc[] args) throws Exception JavaDoc {
190         new MultiplexerStressTest().start();
191     }
192
193
194 }
195
Popular Tags