1 23 24 package org.objectweb.jonas.discovery; 25 26 import java.io.ByteArrayInputStream ; 27 import java.io.IOException ; 28 import java.io.ObjectInputStream ; 29 import java.net.DatagramPacket ; 30 import java.net.DatagramSocket ; 31 import java.net.InetAddress ; 32 import java.net.MulticastSocket ; 33 import java.net.SocketException ; 34 import java.net.UnknownHostException ; 35 36 import org.objectweb.jonas.common.Log; 37 import org.objectweb.jonas.common.NetUtils; 38 import org.objectweb.util.monolog.api.BasicLevel; 39 import org.objectweb.util.monolog.api.Logger; 40 41 45 public class DiscoveryComm implements Runnable { 46 49 public static final int RECEIVE_BUFFER_SIZE = 1024; 50 53 protected MulticastSocket multicastSocket; 54 57 protected DatagramSocket unicastSocket; 58 59 62 private DiscoveryManager dm; 63 66 private int port; 67 70 private InetAddress destAddress; 71 74 protected boolean notStopped = true; 75 78 private int ttl = 1; 80 83 protected String jonasName = null; 84 87 protected String domainName = null; 88 89 92 protected String serverId = null; 93 94 95 98 protected String [] urls = null; 99 100 103 private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX); 104 108 public DiscoveryComm(DiscoveryManager dm) { 109 this.port = dm.getListeningPort(); 110 try { 111 this.destAddress = InetAddress.getByName(dm.getListeningIp()); 112 this.ttl = dm.getTimeToLive(); 113 this.jonasName = dm.getJonasName(); 114 this.domainName = dm.getDomainName(); 115 this.urls = dm.getUrls(); 116 this.serverId = dm.getServerId(); 117 } catch (UnknownHostException e) { 118 logger.log(BasicLevel.ERROR, "Unknown Host", e); 119 } 120 this.dm = dm; 121 } 122 123 128 protected void join() { 129 try { 130 multicastSocket = new MulticastSocket (port); 131 multicastSocket.setTimeToLive(ttl); 132 multicastSocket.joinGroup(destAddress); 133 if (logger.isLoggable(BasicLevel.DEBUG)) { 134 logger.log(BasicLevel.DEBUG, "multicast ip address is " 135 + destAddress); 136 logger.log(BasicLevel.DEBUG, "multicast port is " + port); 137 } 138 } catch (IOException e) { 139 logger.log(BasicLevel.ERROR, "io problem", e); 140 } 141 } 142 143 147 public void sendNotif(DiscMessage msg) { 148 try { 149 if (logger.isLoggable(BasicLevel.DEBUG)) { 152 logger.log(BasicLevel.DEBUG, msg); 153 } 154 byte[] messageBytes = DiscMessage.objectToBytes(msg); 155 multicastSocket.send(new DatagramPacket (messageBytes, 156 messageBytes.length, destAddress, port)); 157 } catch (IOException e1) { 158 logger.log(BasicLevel.ERROR, "DiscoveryComm: Error to send notification", e1); 159 } 160 161 } 162 169 protected void sendResponse(DiscMessage msg, InetAddress destAddress, int port) { 170 if (logger.isLoggable(BasicLevel.DEBUG)) { 171 logger.log(BasicLevel.DEBUG, "DiscoveryComm : The message to send is " 172 + msg + "Sending it to: " + destAddress + " and port is: " + port); 173 } 174 byte[] messageBytes = DiscMessage.objectToBytes(msg); 175 if (messageBytes != null) { 176 try { 177 unicastSocket.send(new DatagramPacket (messageBytes, 179 messageBytes.length, destAddress, port)); 180 } catch (IOException e) { 181 logger.log(BasicLevel.ERROR, "DiscoveryComm: Error to send response to discovery message", e); 182 } 183 } 184 } 185 186 196 public DiscEvent createNotifMessage(String state) throws Exception { 197 String theHostAddress; 198 try { 199 theHostAddress = NetUtils.getLocalAddress(); 200 } catch (UnknownHostException e) { 201 logger.log(BasicLevel.ERROR, "Unknown host", e); 202 return null; 203 } 204 205 if (!state.equals(DiscEvent.RUNNING)) { 206 urls = null; 207 } 208 DiscEvent resp = new DiscEvent(theHostAddress, port, jonasName, domainName, serverId, urls); 211 resp.setState(state); 212 return resp; 213 } 214 215 216 220 public void run() { 221 DatagramPacket datagram = new DatagramPacket (new byte[RECEIVE_BUFFER_SIZE], 223 RECEIVE_BUFFER_SIZE); 224 Object objReceived = null; 226 ObjectInputStream in = null; 227 228 join(); 230 DiscEvent msg = null; 232 try { 233 msg = createNotifMessage(DiscEvent.RUNNING); 234 } catch (Exception e) { 235 logger.log(BasicLevel.ERROR, 236 "DiscoveryComm: Unable to create a notification message", e); 237 } 238 if (msg != null) { 239 sendNotif(msg); 241 } 242 try { 244 unicastSocket = new DatagramSocket (); 245 } catch (SocketException e3) { 246 logger.log(BasicLevel.ERROR, "Socket exception", e3); 247 return; 248 } 249 try { 250 while (notStopped) { 251 multicastSocket.receive(datagram); 252 in = new ObjectInputStream (new ByteArrayInputStream (datagram.getData())); 253 objReceived = in.readObject(); 254 255 if (objReceived != null) { 256 if (objReceived instanceof DiscMessage) { 258 if ((objReceived instanceof DiscEvent) || (objReceived instanceof DiscGreeting) ) { 259 if (logger.isLoggable(BasicLevel.DEBUG)) { 260 logger.log(BasicLevel.DEBUG, 261 "This discovery event is ignored " + objReceived); 262 } 263 } else { 264 DiscMessage request = (DiscMessage) objReceived; 265 if (logger.isLoggable(BasicLevel.DEBUG)) { 266 logger.log(BasicLevel.DEBUG, 267 "A dicovery message is received " 268 + objReceived); 269 } 270 271 274 284 if (msg != null) { 285 InetAddress destAddress = datagram.getAddress(); 287 int destPort = request.getSourcePort(); 288 sendResponse(msg, destAddress, destPort); 289 } 290 } 291 } 292 } 293 } 294 } catch (SocketException e) { 295 logger.log(BasicLevel.ERROR, "Socket closed: ", e); 296 notStopped = false; 297 } catch (IOException e1) { 298 logger.log(BasicLevel.ERROR, e1); 299 } catch (ClassNotFoundException e) { 300 logger.log(BasicLevel.ERROR, e); 301 } 302 } 303 304 308 public void stop() { 309 DiscEvent msg = null; 311 try { 312 if (logger.isLoggable(BasicLevel.DEBUG)) { 313 logger.log(BasicLevel.DEBUG, "Sending a STOPPING DiscEvent."); 314 } 315 msg = createNotifMessage(DiscEvent.STOPPING); 316 } catch (Exception e) { 317 logger.log(BasicLevel.ERROR, e); 318 } 319 if (msg != null) { 320 sendNotif(msg); 321 } 322 Thread.interrupted(); 323 } 324 325 328 protected void setJonasName(String jonasName) { 329 this.jonasName = jonasName; 330 } 331 334 protected void setDomainName(String domainName) { 335 this.domainName = domainName; 336 } 337 340 protected void setUrls(String [] urls) { 341 this.urls = urls; 342 } 343 344 public String getServerId() { 345 return serverId; 346 } 347 348 public void setServerId(String serverId) { 349 this.serverId = serverId; 350 } 351 } | Popular Tags |