1 package org.jgroups.tests.perf.transports; 2 3 4 import org.jgroups.stack.IpAddress; 5 import org.jgroups.tests.perf.Receiver; 6 import org.jgroups.tests.perf.Transport; 7 8 import java.io.IOException ; 9 import java.net.DatagramPacket ; 10 import java.net.DatagramSocket ; 11 import java.net.InetAddress ; 12 import java.net.MulticastSocket ; 13 import java.util.Properties ; 14 15 20 public class UdpTransport implements Transport { 21 Receiver receiver=null; 22 Properties config=null; 23 InetAddress mcast_addr=null; 24 int mcast_port=7500; 25 InetAddress bind_addr=null; 26 MulticastSocket mcast_sock=null; 27 DatagramSocket ucast_sock=null; 28 IpAddress local_addr=null; 29 ReceiverThread mcast_receiver=null; 30 ReceiverThread ucast_receiver=null; 31 int max_receiver_buffer_size=500000; 32 int max_send_buffer_size=500000; 33 34 35 public UdpTransport() { 36 } 37 38 public Object getLocalAddress() { 39 return local_addr; 40 } 41 42 public void create(Properties properties) throws Exception { 43 this.config=properties; 44 String mcast_addr_str=System.getProperty("udp.mcast_addr", config.getProperty("mcast_addr")); 45 if(mcast_addr_str == null) 46 mcast_addr_str="228.8.8.8"; 47 mcast_addr=InetAddress.getByName(mcast_addr_str); 48 49 String bind_addr_str=System.getProperty("udp.bind_addr", config.getProperty("bind_addr")); 50 if(bind_addr_str != null) { 51 bind_addr=InetAddress.getByName(bind_addr_str); 52 } 53 else 54 bind_addr=InetAddress.getLocalHost(); 55 56 ucast_sock=new DatagramSocket (0, bind_addr); 57 ucast_sock.setReceiveBufferSize(max_receiver_buffer_size); 58 ucast_sock.setSendBufferSize(max_send_buffer_size); 59 mcast_sock=new MulticastSocket (mcast_port); 60 mcast_sock.setReceiveBufferSize(max_receiver_buffer_size); 61 mcast_sock.setSendBufferSize(max_send_buffer_size); 62 if(bind_addr != null) 63 mcast_sock.setInterface(bind_addr); 64 mcast_sock.joinGroup(mcast_addr); 65 local_addr=new IpAddress(ucast_sock.getLocalAddress(), ucast_sock.getLocalPort()); 66 System.out.println("-- local_addr is " + local_addr); 67 } 68 69 70 public void start() throws Exception { 71 mcast_receiver=new ReceiverThread(mcast_sock); 72 ucast_receiver=new ReceiverThread(ucast_sock); 73 mcast_receiver.start(); 74 ucast_receiver.start(); 75 } 76 77 public void stop() { 78 if(mcast_receiver != null) 79 mcast_receiver.stop(); 80 if(ucast_receiver != null) 81 ucast_receiver.stop(); 82 } 83 84 public void destroy() { 85 if(mcast_sock != null) 86 mcast_sock.close(); 87 if(ucast_sock != null) 88 ucast_sock.close(); 89 } 90 91 public void setReceiver(Receiver r) { 92 this.receiver=r; 93 } 94 95 public void send(Object destination, byte[] payload) throws Exception { 96 DatagramPacket p; 97 if(destination == null) { 98 p=new DatagramPacket (payload, payload.length, mcast_addr, mcast_port); 99 } 100 else { 101 IpAddress addr=(IpAddress)destination; 102 p=new DatagramPacket (payload, payload.length, addr.getIpAddress(), addr.getPort()); 103 104 } 105 ucast_sock.send(p); 106 } 107 108 109 110 111 112 113 114 class ReceiverThread implements Runnable { 115 DatagramSocket sock; 116 Thread t=null; 117 118 ReceiverThread(DatagramSocket sock) { 119 this.sock=sock; 120 } 121 122 void start() throws Exception { 123 t=new Thread (this, "ReceiverThread for " + sock.getLocalAddress() + ':' + sock.getLocalPort()); 124 t.start(); 125 } 126 127 void stop() { 128 t=null; 129 if(sock != null) 130 sock.close(); 131 } 132 133 public void run() { 134 byte[] buf=new byte[128000]; 135 DatagramPacket p; 136 137 while(t != null) { 138 p=new DatagramPacket (buf, buf.length); 139 try { 140 sock.receive(p); 141 if(receiver != null) { 142 IpAddress addr=new IpAddress(p.getAddress(), p.getPort()); 143 receiver.receive(addr, p.getData()); 144 } 145 } 146 catch(IOException e) { 147 if(sock == null) 148 t=null; 149 } 150 } 151 t=null; 152 } 153 } 154 } 155 | Popular Tags |