1 24 25 package org.objectweb.tribe.channel; 26 27 import java.io.Serializable ; 28 import java.net.InetAddress ; 29 import java.util.ArrayList ; 30 31 import org.objectweb.tribe.adapters.PullPushAdapter; 32 import org.objectweb.tribe.channel.tcp.TcpChannelPool; 33 import org.objectweb.tribe.common.Address; 34 import org.objectweb.tribe.common.Group; 35 import org.objectweb.tribe.common.GroupIdentifier; 36 import org.objectweb.tribe.common.IpAddress; 37 import org.objectweb.tribe.common.Member; 38 import org.objectweb.tribe.exceptions.AlreadyMemberException; 39 import org.objectweb.tribe.gms.GroupMembershipListener; 40 import org.objectweb.tribe.gms.GroupMembershipService; 41 import org.objectweb.tribe.gms.discovery.UdpDiscoveryService; 42 import org.objectweb.tribe.messages.ByteMessage; 43 import org.objectweb.tribe.messages.MessageListener; 44 45 51 public class ChannelPerformaceTest 52 implements 53 MessageListener, 54 GroupMembershipListener 55 { 56 private ReliableGroupChannelWithGms channel; 57 private UdpDiscoveryService discovery; 58 private GroupMembershipService gms; 59 private final GroupIdentifier testGid = new GroupIdentifier("groupTest"); 60 private PullPushAdapter adapter; 61 private int received; 62 private int msgSize; 63 private int nbOfMessages; 64 private int nbOfSenders; 65 private int nbOfMembers; 66 private int done; 67 private long start; 68 private boolean sending = false; 69 70 78 public ChannelPerformaceTest(int members, int senders, int messages, int size) 79 { 80 if (members == 0) 81 { 82 System.out.println("Members must be >0."); 83 System.exit(1); 84 } 85 if (senders == 0) 86 { 87 System.out.println("Senders must be >0."); 88 System.exit(1); 89 } 90 if (senders > members) 91 { 92 System.out 93 .println("Senders must be lower or equal to number of members."); 94 System.exit(1); 95 } 96 this.nbOfMembers = members; 97 this.nbOfSenders = senders; 98 this.nbOfMessages = messages; 99 this.msgSize = size; 100 try 101 { 102 initializeChannel(); 103 } 104 catch (Exception e) 105 { 106 e.printStackTrace(); 107 System.out.println("Failed to initialize channel. Exiting ..."); 108 System.exit(1); 109 } 110 } 111 112 117 private void initializeChannel() throws Exception 118 { 119 final InetAddress MULTICAST_ADDRESS = InetAddress.getByName("224.7.65.23"); 120 final int MULTICAST_PORT = 2288; 121 final IpAddress MULTICAST_IP = new IpAddress(MULTICAST_ADDRESS, 122 MULTICAST_PORT); 123 final InetAddress REPLY_ADDRESS = InetAddress.getLocalHost(); 124 final int REPLY_PORT = 0; final IpAddress REPLY_IP = new IpAddress(REPLY_ADDRESS, REPLY_PORT); 126 127 System.out.println("Using address: " + REPLY_IP); 128 129 discovery = new UdpDiscoveryService(MULTICAST_IP, REPLY_IP); 130 gms = new GroupMembershipService(REPLY_IP, TcpChannelPool.getChannelPool(), 131 discovery); 132 channel = new ReliableGroupChannelWithGms(gms); 133 gms.registerGroupMembershipListener(this); 134 135 137 try 138 { 139 channel.join(testGid); 140 } 141 catch (AlreadyMemberException e) 142 { 143 e.printStackTrace(); 144 throw e; 145 } 146 147 adapter = new PullPushAdapter(channel, this); 148 } 149 150 153 private void sendMessages() 154 { 155 synchronized (this) 156 { 157 if (sending) 158 return; 159 sending = true; 160 } 161 ArrayList members = gms.getGroup(testGid).getMembers(); 162 int size = members.size(); 163 for (int i = 0; i < nbOfSenders; i++) 164 { 165 if (members.get(i).equals(channel.getLocalMembership())) 166 { ByteMessage msg = new ByteMessage(new byte[msgSize]); 168 System.out.println("Starting sending " + nbOfMessages + " message of " 169 + msgSize + " bytes to " + size + " members."); 170 System.out.println("Members are: " 171 + gms.getGroup(testGid).getStringMembers()); 172 start = System.currentTimeMillis(); 173 for (int j = 0; j < nbOfMessages; j++) 174 { 175 try 176 { 177 channel.send(msg, members); 178 } 179 catch (Exception e) 180 { 181 e.printStackTrace(); 182 System.out.println("Failure while sending message " + j); 183 System.exit(1); 184 } 185 } 186 long end = System.currentTimeMillis(); 187 System.out.println("Average time per message: " 188 + ((double) (end - start) / (double) (nbOfMessages)) + "ms"); 189 return; 190 } 191 else 192 System.out.println("Member " + members.get(i) + " does not match " 193 + channel.getLocalMembership()); 194 } 195 System.out.println("I am not a sender, waiting for messages."); 196 } 197 198 202 205 public void receive(Serializable msg) 206 { 207 if (msg instanceof PerfTestDone) 208 { 209 done++; 210 if (done == nbOfMembers) 211 { 212 if (start != 0) 213 { 214 long end = System.currentTimeMillis(); 215 System.out.println("Overall average time per message: " 216 + ((double) (end - start) / (double) (nbOfMessages)) + "ms"); 217 } 218 System.out.println("Everybody is done, exiting ..."); 219 System.exit(0); 220 } 221 else 222 System.out.println("Member " + ((PerfTestDone) msg).getSender() 223 + " is done."); 224 return; 225 } 226 227 if (msg instanceof ByteMessage) 228 { 229 received++; 230 if (received == nbOfMessages * nbOfSenders) 231 { 232 try 233 { 234 channel.send(new PerfTestDone(channel.getLocalMembership().toString())); 235 } 236 catch (Exception e) 237 { 238 System.out.println("Failed to send last acknowledgement"); 239 e.printStackTrace(); 240 } 241 System.out.println("Received " + (nbOfMessages * nbOfSenders) 242 + " messages, acknowledging."); 243 } 244 else if (received % 1000 == 0) 245 { 246 System.out.println("Received " + received + " messages"); 247 } 248 } 249 } 250 251 255 public void joinMember(Member m, GroupIdentifier gid) 256 { 257 System.out.println("Member " + m + " has joined the group " + gid); 258 } 259 260 264 public void quitMember(Member m, GroupIdentifier gid) 265 { 266 System.out.println("Member " + m + " has left the group " + gid); 267 } 268 269 273 public void groupComposition(Group g, Address sender) 274 { 275 System.out.println(sender + " send group " + g.getGroupIdentifier() 276 + " composition."); 277 if (gms.getGroup(testGid).getMembers().size() == this.nbOfMembers) 278 sendMessages(); 279 else 280 System.out.println("Group now has " 281 + gms.getGroup(testGid).getMembers().size() + " members."); 282 } 283 284 289 public void failedMember(Member failed, GroupIdentifier gid, Member sender) 290 { 291 System.out.println("Member " + failed + " failed in group " + gid 292 + "(reported by " + sender + ")"); 293 } 294 295 299 311 public static void main(String args[]) 312 { 313 try 314 { 315 ChannelPerformaceTest perf = new ChannelPerformaceTest(Integer 316 .parseInt(args[0]), Integer.parseInt(args[1]), Integer 317 .parseInt(args[2]), Integer.parseInt(args[3])); 318 } 319 catch (Exception e) 320 { 321 e.printStackTrace(); 322 System.out 323 .println("Usage: java org.objectweb.tribe.channel.ChannelPerformaceTest members senders messages msgSize"); 324 System.out.println("members: number of group members"); 325 System.out 326 .println("senders: number of senders (first members to join the group will send)"); 327 System.out.println("messages: number of messages to send to the group"); 328 System.out.println("msgSize: message size in bytes\n"); 329 System.out 330 .println("Example: java org.objectweb.tribe.channel.ChannelPerformaceTest 2 1 10000 1024"); 331 System.exit(1); 332 } 333 } 334 335 } | Popular Tags |