1 package org.jgroups.tests.adaptudp; 2 3 import org.jgroups.stack.IpAddress; 4 import org.jgroups.util.Util; 5 6 import java.io.*; 7 import java.net.DatagramPacket ; 8 import java.net.DatagramSocket ; 9 import java.net.InetAddress ; 10 import java.net.MulticastSocket ; 11 import java.util.ArrayList ; 12 import java.util.List ; 13 14 15 16 32 public class UdpTester { 33 private boolean sender; 34 private int num_msgs; 35 private int msg_size; 36 private int num_senders; 37 private long log_interval=1000; 38 MulticastSocket recv_sock; 39 DatagramSocket send_sock; 40 int num_members; 41 IpAddress local_addr; 42 MyReceiver receiver=null; 43 44 45 List members=new ArrayList (); 46 47 public UdpTester(MulticastSocket recv_sock, DatagramSocket send_sock, boolean snd, int num_msgs, 48 int msg_size, int num_members, int ns, long log_interval) { 49 sender=snd; 50 this.num_msgs=num_msgs; 51 this.msg_size=msg_size; 52 num_senders=ns; 53 this.num_members=num_members; 54 this.log_interval=log_interval; 55 this.recv_sock=recv_sock; 56 this.send_sock=send_sock; 57 this.local_addr=new IpAddress(send_sock.getLocalAddress(), send_sock.getLocalPort()); 58 } 59 60 public void initialize() throws Exception { 61 62 waitUntilAllMembersHaveJoined(); 63 Util.sleep(1000); 64 65 new ReceiverThread(recv_sock, num_msgs, msg_size, num_senders, log_interval).start(); 66 if(sender) { 67 new SenderThread(send_sock, num_msgs, msg_size, log_interval).start(); 68 } 69 } 70 71 void waitUntilAllMembersHaveJoined() throws Exception { 72 discoverExistingMembers(); 73 74 } 75 76 private void discoverExistingMembers() throws Exception { 77 receiver=new MyReceiver(); 78 members.clear(); 79 receiver.start(); 80 receiver.discoverExistingMembers(); 81 receiver.sendMyAddress(); 82 receiver.waitUntilAllMembersHaveJoined(); 83 84 } 86 87 class MyReceiver extends Thread { 88 boolean running=true; 89 90 public void run() { 91 byte[] buf=new byte[65000]; 92 DatagramPacket p=new DatagramPacket (buf, buf.length); 93 ByteArrayInputStream input; 94 ObjectInputStream in; 95 Request req; 96 boolean running=true; 97 98 while(running) { 99 try { 100 recv_sock.receive(p); 101 input=new ByteArrayInputStream(p.getData(), 0, p.getLength()); 102 in=new ObjectInputStream(input); 103 req=(Request)in.readObject(); 104 switch(req.type) { 105 case Request.DISCOVERY_REQ: 106 byte[] tmp; 107 ByteArrayOutputStream output=new ByteArrayOutputStream(); 108 ObjectOutputStream out=new ObjectOutputStream(output); 109 Request rsp=new Request(Request.NEW_MEMBER, local_addr); 110 out.writeObject(rsp); 111 output.flush(); 112 tmp=output.toByteArray(); 113 DatagramPacket rsp_p=new DatagramPacket (tmp, tmp.length, 114 InetAddress.getByName(Test.mcast_addr), Test.mcast_port); 115 send_sock.send(rsp_p); 116 break; 117 case Request.NEW_MEMBER: 118 IpAddress new_mbr=(IpAddress)req.arg; 119 if(!members.contains(new_mbr)) { 120 members.add(new_mbr); 121 System.out.println("-- discovered " + new_mbr); 122 if(members.size() >= num_members) { 123 System.out.println("-- all members have joined (" + members + ')'); 124 running=false; 125 break; 126 } 127 } 128 break; 129 default: 130 System.err.println("don't recognize request with type=" + req.type); 131 break; 132 } 133 } 134 catch(IOException e) { 135 e.printStackTrace(); 136 break; 137 } 138 catch(ClassNotFoundException e) { 139 e.printStackTrace(); 140 } 141 } 142 } 143 144 public void discoverExistingMembers() throws Exception { 145 Request req=new Request(Request.DISCOVERY_REQ, null); 146 byte[] b=Util.objectToByteBuffer(req); 147 DatagramPacket p=new DatagramPacket (b, b.length, InetAddress.getByName(Test.mcast_addr), Test.mcast_port); 148 send_sock.send(p); 149 150 } 151 152 public void sendMyAddress() throws Exception { 153 Request req=new Request(Request.NEW_MEMBER, local_addr); 154 byte[] b=Util.objectToByteBuffer(req); 155 DatagramPacket p=new DatagramPacket (b, b.length, InetAddress.getByName(Test.mcast_addr), Test.mcast_port); 156 send_sock.send(p); 157 } 158 159 public void waitUntilAllMembersHaveJoined() throws InterruptedException { 160 if(members.size() < num_members) { 161 if(receiver.isAlive()) 162 receiver.join(); 163 } 164 } 165 166 } 167 168 } 169 170 | Popular Tags |