1 package org.jgroups.tests; 2 3 import org.jgroups.*; 4 import org.jgroups.util.Util; 5 6 import java.util.concurrent.CyclicBarrier ; 7 import java.util.concurrent.atomic.AtomicLong ; 8 9 16 public class MultiplexerStressTest { 17 Channel c11, c12, c21, c22, c31, c32; 18 ChannelFactory f1, f2, f3; 19 private MyReceiver r11, r12, r21, r22, r31, r32; 20 21 static final int NUM_MSGS=100000; 22 static final int SIZE=1000; 23 24 25 public MultiplexerStressTest() { 26 27 } 28 29 private void start() throws Exception { 30 CyclicBarrier barrier=new CyclicBarrier (7); 31 f1=new JChannelFactory(); 32 f2=new JChannelFactory(); 33 f3=new JChannelFactory(); 34 35 f1.setMultiplexerConfig("stacks.xml"); 36 f2.setMultiplexerConfig("stacks.xml"); 37 f3.setMultiplexerConfig("stacks.xml"); 38 39 c11=f1.createMultiplexerChannel("udp", "A"); 40 c11.connect("X"); 41 r11=new MyReceiver(barrier); 42 c11.setReceiver(r11); 43 44 c12=f1.createMultiplexerChannel("udp", "B"); 45 c12.connect("X"); 46 r12=new MyReceiver(barrier); 47 c12.setReceiver(r12); 48 49 c21=f2.createMultiplexerChannel("udp", "A"); 50 c21.connect("X"); 51 r21=new MyReceiver(barrier); 52 c21.setReceiver(r21); 53 54 c22=f2.createMultiplexerChannel("udp", "B"); 55 c22.connect("X"); 56 r22=new MyReceiver(barrier); 57 c22.setReceiver(r22); 58 59 c31=f3.createMultiplexerChannel("udp", "A"); 60 c31.connect("X"); 61 r31=new MyReceiver(barrier); 62 c31.setReceiver(r31); 63 64 c32=f3.createMultiplexerChannel("udp", "B"); 65 c32.connect("X"); 66 r32=new MyReceiver(barrier); 67 c32.setReceiver(r32); 68 69 long start, stop; 70 71 new MySender(barrier, c11).start(); 72 new MySender(barrier, c12).start(); 73 new MySender(barrier, c21).start(); 74 new MySender(barrier, c22).start(); 75 new MySender(barrier, c31).start(); 76 new MySender(barrier, c32).start(); 77 78 barrier.await(); start=System.currentTimeMillis(); 80 81 barrier.await(); stop=System.currentTimeMillis(); 83 84 System.out.println("Cluster A:\n" + printStats(stop-start,new MyReceiver[]{r11,r21,r31})); 85 System.out.println("Cluster B:\n" + printStats(stop-start,new MyReceiver[]{r12,r22,r32})); 86 87 c32.close(); 88 c31.close(); 89 c21.close(); 90 c22.close(); 91 c12.close(); 92 c11.close(); 93 } 94 95 private String printStats(long total_time, MyReceiver[] cluster) { 96 int num_msgs=0; 97 long num_bytes=0; 98 int cluster_size=cluster.length; 99 100 for(int i=0; i < cluster_size; i++) { 101 num_msgs+=cluster[i].getNumMessages(); 102 num_bytes+=cluster[i].getNumBytes(); 103 } 104 105 double msgs_per_sec=num_msgs / (total_time / 1000.00); 106 double bytes_per_sec=num_bytes * SIZE / (total_time / 1000.00); 107 108 StringBuilder sb=new StringBuilder (); 109 sb.append("total msgs=").append(num_msgs).append(", msg rate=").append(msgs_per_sec); 110 sb.append(", total time=").append(total_time / 1000.00); 111 sb.append(", throughput=").append(Util.printBytes(bytes_per_sec)); 112 return sb.toString(); 113 } 114 115 116 private static class MyReceiver extends ReceiverAdapter { 117 AtomicLong received_msgs=new AtomicLong (0); 118 AtomicLong received_bytes=new AtomicLong (0); 119 CyclicBarrier barrier; 120 int print=NUM_MSGS / 10; 121 122 123 public MyReceiver(CyclicBarrier barrier) { 124 this.barrier=barrier; 125 } 126 127 public long getNumMessages() { 128 return received_msgs.get(); 129 } 130 131 public long getNumBytes() { 132 return received_bytes.get(); 133 } 134 135 136 public void receive(Message msg) { 137 int length=msg.getLength(); 138 if(length > 0) { 139 received_msgs.incrementAndGet(); 140 received_bytes.addAndGet(length); 141 142 if(received_msgs.get() % print == 0) 143 System.out.println("received " + received_msgs.get() + " msgs"); 144 145 if(received_msgs.get() >= NUM_MSGS) { 146 try { 147 barrier.await(); 148 } 149 catch(Exception e) { 150 e.printStackTrace(); 151 } 152 } 153 } 154 } 155 } 156 157 158 159 private static class MySender extends Thread { 160 CyclicBarrier barrier; 161 Channel ch; 162 163 164 public MySender(CyclicBarrier barrier, Channel ch) { 165 this.barrier=barrier; 166 this.ch=ch; 167 } 168 169 public void run() { 170 byte[] buf=new byte[SIZE]; 171 Message msg; 172 int print=NUM_MSGS / 10; 173 try { 174 barrier.await(); 175 for(int i=1; i <= NUM_MSGS; i++) { 176 msg=new Message(null, null, buf, 0, buf.length); 177 ch.send(msg); 178 if(i % print == 0) 179 System.out.println(getName() + ": sent " + i + " msgs"); 180 } 181 } 182 catch(Exception e) { 183 e.printStackTrace(); 184 } 185 } 186 } 187 188 189 public static void main(String [] args) throws Exception { 190 new MultiplexerStressTest().start(); 191 } 192 193 194 } 195 | Popular Tags |