1 24 25 package org.objectweb.tribe.gms.discovery; 26 27 import java.io.IOException ; 28 import java.net.DatagramPacket ; 29 import java.net.MulticastSocket ; 30 import java.net.SocketException ; 31 import java.util.ArrayList ; 32 33 import org.objectweb.tribe.common.GroupIdentifier; 34 import org.objectweb.tribe.common.IpAddress; 35 import org.objectweb.tribe.common.log.Trace; 36 import org.objectweb.tribe.gms.protocol.GroupDiscoveryMessage; 37 import org.objectweb.tribe.messages.DatagramMessage; 38 39 48 public class UdpDiscoveryService extends Thread implements DiscoveryService 49 { 50 private static final int RECEIVE_BUFFER_SIZE = 1024; 51 52 private MulticastSocket multicastSocket; 53 private IpAddress multicastAddress; 54 private IpAddress replyAddress; 55 private ArrayList listeners; 56 private boolean isKilled = false; 57 58 private Trace logger = Trace 59 .getLogger("org.objectweb.tribe.discovery"); 60 61 67 public UdpDiscoveryService(IpAddress multicastAddress, IpAddress replyAddress) 68 { 69 super("UdpDiscoveryService"); 70 if (multicastAddress == null) 71 throw new IllegalArgumentException ( 72 "Null addresses in UdpDiscoveryService constructor"); 73 74 listeners = new ArrayList (); 75 this.replyAddress = replyAddress; 76 try 77 { 78 logger.info("Creating UdpDiscoveryService on " + multicastAddress); 80 this.multicastAddress = multicastAddress; 81 multicastSocket = new MulticastSocket (multicastAddress.getPort()); 82 multicastSocket.joinGroup(multicastAddress.getAddress()); 83 start(); 84 } 85 catch (SocketException e) 86 { 87 logger.error("Failed to create multicast socket", e); 88 } 89 catch (IOException e) 90 { 91 logger.error("Failed to create multicast socket", e); 92 } 93 } 94 95 98 public void sendGroupDiscovery(GroupIdentifier gid) 99 { 100 try 101 { 102 if (logger.isDebugEnabled()) 103 logger.debug("Sending GroupDiscoveryMessage for group " + gid 104 + " reply to " + replyAddress); 105 DatagramPacket joinPacket = new GroupDiscoveryMessage(replyAddress, 106 multicastAddress, gid).getDatagramPacket(); 107 multicastSocket.send(joinPacket); 108 } 109 catch (IOException e) 110 { 111 if (logger.isDebugEnabled()) 112 logger.debug("Failed to send group discovery message for group " + gid, 113 e); 114 else if (logger.isInfoEnabled()) 115 logger.info("Failed to send group discovery message for group " + gid 116 + " (" + e + ")"); 117 } 118 } 119 120 123 public void registerDiscoveryListener(DiscoveryListener listener) 124 { 125 synchronized (listeners) 126 { 127 listeners.add(listener); 128 } 129 } 130 131 134 public boolean unregisterDiscoveryListener(DiscoveryListener listener) 135 { 136 synchronized (listeners) 137 { 138 return listeners.remove(listener); 139 } 140 } 141 142 146 public void run() 147 { 148 DatagramPacket datagram = new DatagramPacket (new byte[RECEIVE_BUFFER_SIZE], 149 RECEIVE_BUFFER_SIZE); 150 151 while (!isKilled) 152 { 153 try 154 { 155 multicastSocket.receive(datagram); 156 Object received = DatagramMessage.getObjectFromDatagram(datagram); 157 if (received != null) 158 { 159 if (received instanceof GroupDiscoveryMessage) 160 { 161 GroupDiscoveryMessage msg = (GroupDiscoveryMessage) received; 162 if (logger.isDebugEnabled()) 163 logger.debug("Received GroupDiscoveryMessage for group " 164 + msg.getGroupIdentifier() + " from " 165 + msg.getSourceAddress()); 166 synchronized (listeners) 168 { 169 int size = listeners.size(); 170 for (int i = 0; i < size; i++) 171 { 172 DiscoveryListener listener = (DiscoveryListener) listeners 173 .get(i); 174 listener.discoveryRequest(msg.getGroupIdentifier(), msg 175 .getSourceAddress()); 176 } 177 } 178 } 179 else 180 logger.info("UdpDiscoveryService received an unexpected message: " 181 + received); 182 } 183 } 184 catch (IOException e) 185 { 186 logger.error("Error while receiving message", e); 187 } 188 } 189 } 190 191 196 public IpAddress getReplyAddress() 197 { 198 return replyAddress; 199 } 200 201 206 public void setReplyAddress(IpAddress replyAddress) 207 { 208 this.replyAddress = replyAddress; 209 } 210 211 214 public void kill() 215 { 216 isKilled = true; 217 multicastSocket.close(); 218 this.interrupt(); 219 try 220 { 221 this.join(1000); 222 } 223 catch (InterruptedException e) 224 { 225 } 226 } 227 } | Popular Tags |