1 21 22 package org.objectweb.jonas.discovery; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.IOException ; 26 import java.io.ObjectInputStream ; 27 import java.net.DatagramPacket ; 28 import java.net.DatagramSocket ; 29 import java.net.InetAddress ; 30 import java.net.MulticastSocket ; 31 import java.net.SocketException ; 32 import java.net.UnknownHostException ; 33 34 import javax.management.Notification ; 35 36 import org.objectweb.jonas.common.Log; 37 import org.objectweb.jonas.service.ServiceException; 38 import org.objectweb.util.monolog.api.BasicLevel; 39 import org.objectweb.util.monolog.api.Logger; 40 41 45 public class DiscoveryClientListener implements Runnable { 46 47 50 public static final String DISCOVERY_TYPE = "jonas.management.discovery"; 51 52 private static int RECEIVE_BUFFER_SIZE = 1024; 53 54 57 private DiscoveryClient discoveryClient; 58 59 62 private MulticastSocket multicastSocket; 63 64 67 private int port; 68 69 72 private InetAddress destAddress; 73 74 77 private int ttl; 78 79 82 private DatagramSocket unicastSocket; 83 84 87 private String sourceIp; 88 89 92 private int sourcePort; 93 94 97 private boolean notStopped = true; 98 99 102 private long timeout = 1000; 103 104 107 private long sequenceNumber = 0; 108 109 112 private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX); 113 114 118 public DiscoveryClientListener(DiscoveryClient discoveryClient) { 119 this.port = discoveryClient.getListeningPort(); 120 try { 121 this.destAddress = InetAddress.getByName(discoveryClient.getListeningIp()); 122 this.ttl = discoveryClient.getTimeToLive(); 123 } catch (UnknownHostException e) { 124 logger.log(BasicLevel.ERROR, "Invalid host", e); 125 } 126 this.timeout = discoveryClient.getTimeout(); 127 this.sourcePort = discoveryClient.getSourcePort(); 128 this.sourceIp = discoveryClient.getSourceIp(); 129 this.discoveryClient = discoveryClient; 130 131 try { 133 unicastSocket = new DatagramSocket (sourcePort); 134 } catch (SocketException e2) { 135 logger.log(BasicLevel.ERROR, "DiscoveryClient : Unable to create a Datagram socket", e2); 136 throw new ServiceException( 138 "Could not create socket to listen for discovery " 139 + "messages at port: " + sourcePort 140 + ". The port might be in use."); 141 } 142 } 143 144 147 public void sendDiscoveryMessage(DiscMessage msg) { 148 if (logger.isLoggable(BasicLevel.DEBUG)) { 149 logger.log(BasicLevel.DEBUG, "DiscoveryClient : The message to send is " + msg); 150 } 151 byte[] messageBytes = DiscMessage.objectToBytes(msg); 154 if (messageBytes != null) { 155 try { 156 multicastSocket.send(new DatagramPacket (messageBytes, messageBytes.length, destAddress, port)); 157 } catch (IOException e1) { 158 logger.log(BasicLevel.ERROR, "DiscoveryClient : Error to send discovery message", e1); 159 } 160 } 161 } 162 163 166 public void run() { 167 DatagramPacket datagram = new DatagramPacket (new byte[RECEIVE_BUFFER_SIZE], RECEIVE_BUFFER_SIZE); 169 Object objReceived = null; 171 ObjectInputStream in = null; 172 173 try { 175 multicastSocket = new MulticastSocket (port); 176 multicastSocket.setTimeToLive(ttl); 177 181 } catch (IOException e) { 183 e.printStackTrace(); 185 } 186 187 201 202 DiscMessage msg = new DiscMessage(sourceIp, sourcePort); 204 sendDiscoveryMessage(msg); 206 if (logger.isLoggable(BasicLevel.DEBUG)) { 207 logger.log(BasicLevel.DEBUG, " DiscoveryClient: Sent Message is" + msg); 208 } 209 210 long lastTime = timeout + System.currentTimeMillis(); 212 DiscEvent event = null; 213 try { 214 while ((notStopped) && System.currentTimeMillis() <= lastTime) { 215 unicastSocket.receive(datagram); 216 in = new ObjectInputStream (new ByteArrayInputStream (datagram.getData())); 217 objReceived = in.readObject(); 218 if (objReceived != null) { 219 if (objReceived instanceof DiscEvent) { 220 event = (DiscEvent) objReceived; 221 event.setSourceAddress(datagram.getAddress().getHostAddress()); 223 handleReceivedMessage(event); 224 } 225 } 226 } 227 } catch (SocketException e) { 228 logger.log(BasicLevel.ERROR, "DiscoveryClient : Socket closed", e); 229 notStopped = false; 230 } catch (IOException e1) { 231 logger.log(BasicLevel.ERROR, "DiscoveryClient IOException", e1); 232 } catch (ClassNotFoundException e) { 233 logger.log(BasicLevel.ERROR, "DiscoveryClient ClassNotFoundException ", e); 234 } 235 } 236 237 240 private void handleReceivedMessage(DiscEvent msg) { 241 if (logger.isLoggable(BasicLevel.DEBUG)) { 242 logger.log(BasicLevel.DEBUG, "discovery event received: " + msg); 243 } 244 245 Notification notif = new Notification (DISCOVERY_TYPE, discoveryClient, sequenceNumber++, System 247 .currentTimeMillis(), msg.getState()); 248 notif.setUserData(msg); 249 discoveryClient.sendNotification(notif); 250 } 251 252 255 public void stop() { 256 notStopped = false; 257 Thread.interrupted(); 258 } 259 } | Popular Tags |