1 package org.jgroups.protocols; 2 3 import org.jgroups.Message; 4 import org.jgroups.Event; 5 import org.jgroups.util.Buffer; 6 import org.jgroups.util.ExposedByteArrayOutputStream; 7 import org.jgroups.util.Util; 8 9 import java.io.*; 10 import java.net.*; 11 import java.util.Properties ; 12 import java.util.Enumeration ; 13 14 24 public class MPING extends PING implements Runnable { 25 MulticastSocket mcast_sock=null; 26 Thread receiver=null; 27 InetAddress bind_addr=null; 28 boolean bind_to_all_interfaces=true; 29 int ip_ttl=16; 30 InetAddress mcast_addr=null; 31 int mcast_port=7555; 32 33 34 final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(512); 35 byte receive_buf[]=new byte[1024]; 36 static final String IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address"; 37 38 39 public String getName() { 40 return "MPING"; 41 } 42 43 44 public boolean setProperties(Properties props) { 45 String tmp=null, str; 46 47 try { 49 tmp=System.getProperty("bind.address"); 50 if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) { 51 tmp=null; 52 } 53 } 54 catch (SecurityException ex){ 55 } 56 57 if(tmp != null) 58 str=tmp; 59 else 60 str=props.getProperty("bind_addr"); 61 if(str != null) { 62 try { 63 bind_addr=InetAddress.getByName(str); 64 } 65 catch(UnknownHostException unknown) { 66 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known"); 67 return false; 68 } 69 props.remove("bind_addr"); 70 } 71 72 str=props.getProperty("mcast_addr"); 73 if(str != null) { 74 try { 75 mcast_addr=InetAddress.getByName(str); 76 } 77 catch(UnknownHostException e) { 78 log.error("could not resolve " + str, e); 79 return false; 80 } 81 props.remove("mcast_addr"); 82 } 83 84 str=props.getProperty("mcast_port"); 85 if(str != null) { 86 mcast_port=Integer.parseInt(str); 87 props.remove("mcast_port"); 88 } 89 90 str=props.getProperty("ip_ttl"); 91 if(str != null) { 92 ip_ttl=Integer.parseInt(str); 93 props.remove("ip_ttl"); 94 } 95 96 str=props.getProperty("bind_to_all_interfaces"); 97 if(str != null) { 98 bind_to_all_interfaces=new Boolean (str).booleanValue(); 99 props.remove("bind_to_all_interfaces"); 100 } 101 102 if(mcast_addr == null) 103 try { 104 mcast_addr=InetAddress.getByName("230.5.6.7"); 105 } 106 catch(UnknownHostException e) { 107 log.error("failed getting default mcast address", e); 108 return false; 109 } 110 111 return super.setProperties(props); 112 } 113 114 115 public void start() throws Exception { 116 int jdk_version=Util.getJavaVersion(); 117 118 mcast_sock=new MulticastSocket(mcast_port); 119 mcast_sock.setTimeToLive(ip_ttl); 120 121 if(bind_to_all_interfaces && jdk_version >= 14) { 122 bindToAllInterfaces(); 123 if(bind_addr != null) 125 mcast_sock.setNetworkInterface(NetworkInterface.getByInetAddress(bind_addr)); 126 } 127 else { 128 if(bind_addr == null) { 129 InetAddress[] interfaces=InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()); 130 if(interfaces != null && interfaces.length > 0) 131 bind_addr=interfaces[0]; 132 } 133 if(bind_addr == null) 134 bind_addr=InetAddress.getLocalHost(); 135 136 if(bind_addr != null) 137 if(log.isInfoEnabled()) log.info("sockets will use interface " + bind_addr.getHostAddress()); 138 139 140 if(bind_addr != null) { 141 mcast_sock.setInterface(bind_addr); 142 } 144 mcast_sock.joinGroup(mcast_addr); 145 } 146 147 startReceiver(); 148 super.start(); 149 } 150 151 152 153 154 private void bindToAllInterfaces() throws IOException { 155 SocketAddress tmp_mcast_addr=new InetSocketAddress(mcast_addr, mcast_port); 156 Enumeration en=NetworkInterface.getNetworkInterfaces(); 157 while(en.hasMoreElements()) { 158 NetworkInterface i=(NetworkInterface)en.nextElement(); 159 for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) { 160 InetAddress addr=(InetAddress)en2.nextElement(); 161 mcast_sock.joinGroup(tmp_mcast_addr, i); 164 if(log.isTraceEnabled()) 165 log.trace("joined " + tmp_mcast_addr + " on interface " + i.getName() + " (" + addr + ")"); 166 break; 167 } 168 } 169 } 170 171 private void startReceiver() { 172 if(receiver == null || !receiver.isAlive()) { 173 receiver=new Thread (this, "ReceiverThread"); 174 receiver.setDaemon(true); 175 receiver.start(); 176 if(log.isTraceEnabled()) 177 log.trace("receiver thread started"); 178 } 179 } 180 181 public void stop() { 182 mcast_sock.close(); 183 mcast_sock=null; 184 receiver=null; 185 super.stop(); 186 } 187 188 void sendMcastDiscoveryRequest(Message msg) { 189 Buffer buf; 190 DatagramPacket packet; 191 192 try { 193 if(msg.getSrc() == null) 194 msg.setSrc(local_addr); 195 out_stream.reset(); 196 DataOutputStream out=new DataOutputStream(out_stream); 197 msg.writeTo(out); 198 out.close(); buf=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); 200 packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), mcast_addr, mcast_port); 201 mcast_sock.send(packet); 202 } 203 catch(IOException ex) { 204 log.error("failed sending discovery request", ex); 205 } 206 } 207 208 209 210 public void run() { 211 DatagramPacket packet=new DatagramPacket(receive_buf, receive_buf.length); 212 byte[] data; 213 ByteArrayInputStream inp_stream=null; 214 DataInputStream inp=null; 215 Message msg=null; 216 217 while(mcast_sock != null && receiver != null && Thread.currentThread().equals(receiver)) { 218 packet.setData(receive_buf, 0, receive_buf.length); 219 try { 220 mcast_sock.receive(packet); 221 data=packet.getData(); 222 inp_stream=new ByteArrayInputStream(data, 0, data.length); 223 inp=new DataInputStream(inp_stream); 224 msg=new Message(); 225 msg.readFrom(inp); 226 up(new Event(Event.MSG, msg)); 227 } 228 catch(SocketException socketEx) { 229 break; 230 } 231 catch(Exception ex) { 232 log.error("failed receiving packet", ex); 233 } 234 finally { 235 closeInputStream(inp); 236 closeInputStream(inp_stream); 237 } 238 } 239 if(log.isTraceEnabled()) 240 log.trace("receiver thread terminated"); 241 } 242 243 private void closeInputStream(InputStream inp) { 244 if(inp != null) 245 try {inp.close();} catch(IOException e) {} 246 } 247 } 248 | Popular Tags |