1 21 22 package org.objectweb.jonas.discovery; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.IOException ; 26 import java.io.ObjectInputStream ; 27 import java.net.DatagramPacket ; 28 import java.net.InetAddress ; 29 import java.net.MulticastSocket ; 30 import java.net.SocketException ; 31 import java.net.UnknownHostException ; 32 33 import javax.management.Notification ; 34 import org.objectweb.jonas.common.Log; 35 import org.objectweb.util.monolog.api.BasicLevel; 36 import org.objectweb.util.monolog.api.Logger; 37 38 42 public class DiscoveryListener implements Runnable { 43 46 public static final String DISCOVERY_TYPE = "jonas.management.discovery"; 47 48 private static int RECEIVE_BUFFER_SIZE = 1024; 49 52 private int port; 53 56 private InetAddress destAddress; 57 60 private Enroller enroller; 61 64 private int ttl = 1; 68 private MulticastSocket multicastSocket; 69 72 private boolean notStopped = true; 73 76 private long sequenceNumber = 0; 77 private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX); 78 79 83 public DiscoveryListener(Enroller enroller) { 84 this.port = enroller.getListeningPort(); 85 try { 86 this.destAddress = InetAddress.getByName(enroller.getListeningIp()); 87 this.ttl = enroller.getTimeToLive(); 88 } catch (UnknownHostException e) { 89 logger.log(BasicLevel.ERROR, e); 90 } 91 this.enroller = enroller; 92 } 93 97 private void join() { 98 try { 99 multicastSocket = new MulticastSocket (port); 100 multicastSocket.setTimeToLive(ttl); 101 multicastSocket.joinGroup(destAddress); 102 if (logger.isLoggable(BasicLevel.DEBUG)) { 103 logger.log(BasicLevel.DEBUG, "multicast ip address is " 104 + destAddress); 105 106 logger.log(BasicLevel.DEBUG, "multicast port is " + port); 107 } 108 } catch (IOException e) { 109 logger.log(BasicLevel.ERROR, "io problem"); 110 e.printStackTrace(); 112 } 113 } 114 115 123 private void handleDiscEvent(DiscEvent msg) { 124 if (logger.isLoggable(BasicLevel.DEBUG)) { 125 logger.log(BasicLevel.DEBUG, "discovery event received: " + msg); 126 } 127 128 Notification notif = new Notification (DISCOVERY_TYPE, enroller, 130 sequenceNumber++, System.currentTimeMillis(), msg.getState()); 131 notif.setUserData(msg); 132 enroller.sendNotification(notif); 133 } 134 135 public void run() { 136 DatagramPacket datagram = new DatagramPacket (new byte[RECEIVE_BUFFER_SIZE], 138 RECEIVE_BUFFER_SIZE); 139 Object objReceived = null; 141 ObjectInputStream in = null; 142 143 join(); 145 146 try { 147 while (notStopped) { 148 multicastSocket.receive(datagram); 149 in = new ObjectInputStream (new ByteArrayInputStream (datagram.getData())); 150 objReceived = in.readObject(); 151 152 if (objReceived != null) { 153 if (objReceived instanceof DiscEvent) { 154 DiscEvent msg = (DiscEvent) objReceived; 156 msg.setSourceAddress(datagram.getAddress().getHostAddress()); 158 handleDiscEvent(msg); 159 } 160 } 161 } 162 } 163 catch (SocketException e) { 164 logger.log(BasicLevel.ERROR, "Enroller: Socket closed" + e); 165 notStopped = false; 166 } catch (IOException e1) { 167 e1.printStackTrace(); 168 } catch (ClassNotFoundException e) { 169 e.printStackTrace(); 170 } 171 } 172 173 public void stopListener() { 174 notStopped = false; 175 } 176 177 } | Popular Tags |