1 46 package org.mr.plugins.discovery; 47 48 49 import java.net.InetAddress ; 50 import java.net.NetworkInterface ; 51 import java.util.ArrayList ; 52 import java.util.Collection ; 53 import java.util.Enumeration ; 54 import java.util.Iterator ; 55 import java.util.List ; 56 57 58 import org.mr.MantaAgent; 59 60 import org.mr.MantaException; 61 import org.mr.core.groups.GroupsException; 62 import org.mr.core.groups.MutlicastGroupManager; 63 import org.apache.commons.logging.Log; 64 import org.apache.commons.logging.LogFactory; 65 import org.mr.core.net.LocalTransport; 66 import org.mr.core.net.TransportInfo; 67 import org.mr.core.protocol.MantaBusMessage; 68 import org.mr.core.protocol.MantaBusMessageConsts; 69 import org.mr.core.util.byteable.ByteableList; 70 import org.mr.core.util.byteable.ByteableMap; 71 import org.mr.kernel.control.ControlSignalMessageSender; 72 import org.mr.kernel.services.ServiceActor; 73 import org.mr.kernel.services.ServiceActorInfoContainer; 74 import org.mr.kernel.services.ServiceProducer; 75 import org.mr.kernel.world.WorldModeler; 76 77 83 public class ADControlSender extends ControlSignalMessageSender implements Runnable { 84 85 private boolean running = true; 86 87 private Log log = null; 88 private MantaAgent agent = null; 89 private WorldModeler worldModeler = null; 90 private ByteableMap infoMap = null; 91 public static ServiceProducer serviceProducer = null; 92 93 private String agentName = null; 94 private ByteableList serviceActorList = null; 95 private ByteableList removedDurableList = null; 96 97 private MutlicastGroupManager groupsManager = null; 98 private ByteableList sendInfoTempList; 99 100 public ADControlSender() throws GroupsException { 101 agent = MantaAgent.getInstance(); 102 agentName = agent.getAgentName(); 103 serviceProducer = new ServiceProducer(agentName, "AD",(byte)0); 104 worldModeler = agent.getSingletonRepository().getWorldModeler(); 105 groupsManager = agent.getSingletonRepository().getGroupsManager(); 106 serviceActorList = new ByteableList(); 107 removedDurableList = new ByteableList(); 108 log=LogFactory.getLog("ADControlSender"); 109 infoMap = new ByteableMap(); 110 infoMap.put(AutoDiscoveryPlugin.AGENT_NAME, agentName); 111 sendInfoTempList = new ByteableList(); 112 } 114 115 121 public void advertiseService(ServiceActor serviceActor, MantaAgent agent) throws MantaException { 122 ServiceActorInfoContainer serviceActorInfoContainer = new ServiceActorInfoContainer(); 124 serviceActorInfoContainer.setActor(serviceActor); 125 synchronized(serviceActorList){ 126 serviceActorList.add(serviceActorInfoContainer); 127 removedDurableList.remove(serviceActorInfoContainer); 128 } 129 130 131 try { 132 sendInfo(); 133 } 134 catch (Exception e) { 135 e.printStackTrace(); 136 throw new MantaException(e.toString(), MantaException.ID_RECEIVE_GENERAL_FAIL); 137 } 138 139 140 } 142 143 149 public void recallService(ServiceActor serviceActor, MantaAgent agent) throws MantaException { 150 ArrayList tempList = null; 152 synchronized(serviceActorList){ 153 tempList = new ArrayList (); 154 tempList.addAll(serviceActorList); 155 } 156 Iterator actors = tempList.iterator(); 157 while(actors.hasNext()){ 158 ServiceActorInfoContainer saic = (ServiceActorInfoContainer) actors.next(); 159 if(saic.equals(serviceActor)){ 160 synchronized(serviceActorList){ 161 serviceActorList.remove(saic); 162 } 163 } 164 165 } 166 try { 167 sendInfo(); 168 } 169 catch (Exception e) { 170 throw new MantaException(e.toString(), MantaException.ID_RECEIVE_GENERAL_FAIL); 171 } 172 } 174 public void recallDurableSubscription(ServiceActor serviceActor, MantaAgent agent) throws MantaException { 175 ServiceActorInfoContainer serviceActorInfoContainer = new ServiceActorInfoContainer(); 176 serviceActorInfoContainer.setActor(serviceActor); 177 178 removedDurableList.add(serviceActorInfoContainer); 179 180 try { 181 sendInfo(); 182 } 183 catch (Exception e) { 184 e.printStackTrace(); 185 throw new MantaException(e.toString(), MantaException.ID_RECEIVE_GENERAL_FAIL); 186 } 187 188 } 189 190 191 private synchronized void sendInfo() throws Exception { 192 sendInfoTempList.clear(); 193 Collection localTransports = MantaAgent.getInstance().getSingletonRepository().getNetworkManager().getLocalTransports(); 194 List updatedTransportsInfo = new ArrayList (); 195 Iterator i = localTransports.iterator(); 196 while (i.hasNext()) { 197 TransportInfo info =((LocalTransport) i.next()).getInfo(); 198 String myIp = info.getIp().getHostAddress(); 199 if(myIp.equals("0.0.0.0") ){ 200 String validLocalAddress = getValidLocalAddress(); 201 if(validLocalAddress != null){ 202 info= new TransportInfo(validLocalAddress,info.getPort(), info.getTransportInfoType().toString()); 203 } 204 205 } 206 updatedTransportsInfo.add(info); 207 208 } 209 211 212 213 if(localTransports == null){ 214 if(log.isFatalEnabled()){ 215 log.fatal("There are no transports for this layer. Please check the configuration and the log files for errors."); 216 } return; 218 } 219 ByteableList list = new ByteableList(updatedTransportsInfo); 220 221 222 infoMap.put(AutoDiscoveryPlugin.TRANSPORT_INFO, list); 223 synchronized(serviceActorList){ 224 sendInfoTempList.addAll(serviceActorList); 225 } 226 infoMap.put(AutoDiscoveryPlugin.SERVICE_ACTOR, sendInfoTempList); 227 infoMap.put(AutoDiscoveryPlugin.REMOVED_DURABLE, removedDurableList); 228 229 MantaBusMessage msg = MantaBusMessage.getInstance(); 230 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL); 231 232 msg.setPayload(infoMap); 234 msg.setSource(serviceProducer); 235 236 groupsManager.sendMessageToGroup(AutoDiscoveryPlugin.groupKey, AutoDiscoveryPlugin.MANTA_AD_SUBJECT_NAME+worldModeler.getDefaultDomainName(), msg); 237 removedDurableList.clear(); 238 } 240 241 static String validAddress = null; 242 public static synchronized String getValidLocalAddress() { 243 244 try { 248 Enumeration ifs = NetworkInterface.getNetworkInterfaces(); 249 while (ifs.hasMoreElements()) { 250 NetworkInterface iface = (NetworkInterface ) 251 ifs.nextElement(); 252 Enumeration ips = iface.getInetAddresses(); 253 while (ips.hasMoreElements()) { 254 InetAddress ip = (InetAddress ) ips.nextElement(); 255 if (!ip.getHostAddress().equals("127.0.0.1")) { 256 validAddress =ip.getHostAddress(); 257 return validAddress; 258 } 259 } 260 } 261 } catch (Throwable t) {} 262 validAddress ="127.0.0.1"; 263 return validAddress; } 265 266 271 public void run() { 272 while (running) { 273 try { 274 sendInfo(); 275 Thread.sleep(AutoDiscoveryPlugin.refreshInterval); 276 } 277 catch (InterruptedException e) { 278 if(log.isErrorEnabled()){ 279 log.error("InterruptedException at run().", e); 280 } } catch (Exception e) { 283 if(log.isErrorEnabled()){ 284 log.error("Could not send message to AutoDiscovery Group.", e); 285 } } } } 290 293 public boolean isRunning() { 294 return running; 295 } 297 301 public void setRunning(boolean running) { 302 this.running = running; 303 } 305 306 } | Popular Tags |