1 47 package org.mr.plugins.discovery; 48 49 import java.util.ArrayList ; 50 import java.util.Collection ; 51 import java.util.HashMap ; 52 import java.util.Iterator ; 53 import java.util.List ; 54 import java.util.Map ; 55 import java.util.Set ; 56 57 import org.apache.commons.logging.Log; 58 import org.apache.commons.logging.LogFactory; 59 import org.mr.MantaAgent; 60 import org.mr.MantaException; 61 import org.mr.core.configuration.ConfigManager; 62 import org.mr.core.groups.GroupKey; 63 import org.mr.core.groups.GroupMessageListener; 64 import org.mr.core.groups.GroupsException; 65 import org.mr.core.groups.MutlicastGroupManager; 66 import org.mr.core.net.TransportInfo; 67 import org.mr.core.net.TransportType; 68 import org.mr.core.protocol.MantaBusMessage; 69 import org.mr.core.util.SystemTime; 70 import org.mr.core.util.byteable.ByteableList; 71 import org.mr.core.util.byteable.ByteableMap; 72 import org.mr.kernel.Plugin; 73 import org.mr.kernel.control.ControlSignalMessageSender; 74 import org.mr.kernel.services.MantaService; 75 import org.mr.kernel.services.ServiceActor; 76 import org.mr.kernel.services.ServiceActorInfoContainer; 77 import org.mr.kernel.services.ServiceConsumer; 78 import org.mr.kernel.services.ServiceProducer; 79 import org.mr.kernel.services.queues.QueueMaster; 80 import org.mr.kernel.services.queues.VirtualQueuesManager; 81 import org.mr.kernel.services.topics.VirtualTopicManager; 82 import org.mr.kernel.world.WorldModeler; 83 84 85 86 90 public class AutoDiscoveryPlugin implements Plugin, GroupMessageListener { 91 92 public static long refreshInterval = 500; private MantaAgent manta = null; 94 private MutlicastGroupManager groupsManager = null; 95 private Log log = null; 96 private ADControlSender adControlSender = null; 97 private ControlSignalMessageSender controlSignalMessageSender = null; 98 private Map addressTOAgentMap = null; 99 private WorldModeler worldModeler = null; 100 private Thread adControlSenderThread = null; 101 public static GroupKey groupKey = null; 102 103 public static final String MANTA_GROUP_NAME = "manta"; 104 public static final String MANTA_AD_SUBJECT_NAME = "ad"; 105 106 107 public static final String AGENT_NAME = "AGENT_NAME"; 108 public static final String SERVICE_ACTOR = "SERVICE_ACTOR"; 109 public static final String TRANSPORT_INFO = "TRANSPORT_INFO"; 110 public static final String REMOVED_DURABLE = "REMOVED_DURABLE"; 111 112 113 114 115 public AutoDiscoveryPlugin() throws GroupsException { 116 117 manta =MantaAgent.getInstance(); 118 ConfigManager config = manta.getSingletonRepository().getConfigManager(); 119 String multicastIPForAD = config.getStringProperty("plug-ins.auto-discovery.auto_discovery_multicast_ip"); 120 if(multicastIPForAD ==null ){ 121 throw new GroupsException("missing configuration 'plug-ins.auto-discovery.auto_discovery_multicast_ip' "); 122 } 123 int multicastPortForAD=0; 124 try{ 125 multicastPortForAD = config.getIntProperty("plug-ins.auto-discovery.auto_discovery_multicast_port"); 126 }catch(Throwable t){ 127 throw new GroupsException("error in configuration 'plug-ins.auto-discovery.auto_discovery_multicast_port'",t); 128 } 129 String multicastLocalBind = config.getStringProperty("plug-ins.auto-discovery.auto_discovery_local_interface", "0.0.0.0"); 130 if(multicastLocalBind.equals("0.0.0.0")){ 131 multicastLocalBind= ADControlSender.getValidLocalAddress(); 132 } 133 134 groupKey = new GroupKey(multicastIPForAD,multicastPortForAD); 135 groupsManager = manta.getSingletonRepository().getGroupsManager(); 136 controlSignalMessageSender = manta.getSingletonRepository().getServiceActorControlCenter().getDefaultSender(); 137 worldModeler = manta.getSingletonRepository().getWorldModeler(); 138 adControlSender = new ADControlSender(); 139 addressTOAgentMap = new HashMap (); 140 log=LogFactory.getLog("AutoDiscoveryPlugin"); 141 manta = MantaAgent.getInstance(); 142 groupsManager = manta.getSingletonRepository().getGroupsManager(); 143 144 groupsManager.joinGroup(groupKey,multicastLocalBind); 146 groupsManager.registerListenerToSubject(groupKey, MANTA_AD_SUBJECT_NAME+worldModeler.getDefaultDomainName(), this); 147 148 adControlSenderThread = new Thread (adControlSender); 149 adControlSenderThread.setName("ADControlSenderThread"); 150 refreshInterval = config.getLongProperty("plug-ins.auto-discovery.auto_discovery_refresh_interval",refreshInterval); 151 152 if (log.isInfoEnabled()) { 154 log.info("Multicast Address: "+multicastIPForAD+", Multicast Port: "+multicastPortForAD); 155 } 156 } 158 159 164 public String getName() { 165 return "AutoDiscoveryPlugin"; 166 } 168 169 174 public float getVersion() { 175 return 0.1f; 176 } 178 179 183 public void start() { 184 MantaAgent.getInstance().getSingletonRepository().getServiceActorControlCenter().setDefaultSender(adControlSender); 185 adControlSenderThread.start(); 186 } 188 191 public void stop() { 192 MantaAgent.getInstance().getSingletonRepository().getServiceActorControlCenter().setDefaultSender(controlSignalMessageSender); 193 } 195 196 public synchronized void onMessage(GroupKey key, String subject, MantaBusMessage msg) { 197 198 boolean isSaveWorldModeler = false; 199 if(ADControlSender.serviceProducer!= null && msg.getSource().equals(ADControlSender.serviceProducer)){ 200 return; 202 } 203 204 String defaultDomainName = worldModeler.getDefaultDomainName(); 205 ByteableMap infoMap = (ByteableMap) msg.getPayload(); 206 ByteableList newAgentTransportList = (ByteableList) infoMap.get(AutoDiscoveryPlugin.TRANSPORT_INFO); 207 ByteableList serviceActorList = (ByteableList) infoMap.get(AutoDiscoveryPlugin.SERVICE_ACTOR); 208 ByteableList removedDurable = (ByteableList) infoMap.get(AutoDiscoveryPlugin.REMOVED_DURABLE); 209 210 String agentName = (String ) infoMap.get(AutoDiscoveryPlugin.AGENT_NAME); 211 212 Object senderMemberKey = msg.getSource(); 213 ByteableMap knownAgentInfo = (ByteableMap) addressTOAgentMap.get(senderMemberKey); 215 216 Set currentAgentTransportInfo = worldModeler.getAgentTransportInfo(defaultDomainName, agentName); 217 218 if (knownAgentInfo == null) { 220 addressTOAgentMap.put(senderMemberKey, infoMap); 224 225 if (currentAgentTransportInfo == null) { 227 if(log.isInfoEnabled()){ 229 log.info("Discovered a new MantaRay peer called "+agentName+ " at "+newAgentTransportList); 230 } worldModeler.addAgent(defaultDomainName, agentName, newAgentTransportList); 232 isSaveWorldModeler = true; 233 } else { if (removeAgentTransportInfo(currentAgentTransportInfo, newAgentTransportList, agentName, defaultDomainName) == true) 236 isSaveWorldModeler = true; 237 if (addAgentTransportInfo(currentAgentTransportInfo, newAgentTransportList, agentName, defaultDomainName) == true) 238 isSaveWorldModeler = true; 239 } 241 addServiceActors(serviceActorList, defaultDomainName); 247 248 } else { 252 ByteableList currentAgentTransportList = (ByteableList) knownAgentInfo.get(AutoDiscoveryPlugin.TRANSPORT_INFO); 254 255 if (removeAgentTransportInfo(currentAgentTransportInfo, newAgentTransportList, agentName, defaultDomainName) == true) 257 isSaveWorldModeler = true; 258 259 if (addAgentTransportInfo(currentAgentTransportInfo, newAgentTransportList, agentName, defaultDomainName) == true) 260 isSaveWorldModeler = true; 261 262 266 267 ByteableList oldServiceActorList = (ByteableList) knownAgentInfo.get(AutoDiscoveryPlugin.SERVICE_ACTOR); 269 removeServiceActors(serviceActorList,oldServiceActorList, defaultDomainName); 270 addServiceActors(serviceActorList,oldServiceActorList, defaultDomainName); 272 273 ByteableList oldRemovedDurableList = (ByteableList) knownAgentInfo.get(AutoDiscoveryPlugin.REMOVED_DURABLE); 274 275 removeDurableActors(removedDurable,oldRemovedDurableList, defaultDomainName); 276 277 278 279 addressTOAgentMap.put(senderMemberKey, infoMap); 281 } 283 286 287 } 291 292 293 294 304 305 306 307 313 private void removeServiceActors(ByteableList newServiceActorList, ByteableList oldServiceActorList, String defaultDomainName) { 314 ServiceActor actor = null; 315 MantaService service = null; 316 317 int count=0; 318 for (Iterator iter = oldServiceActorList.iterator(); iter.hasNext();) { 319 ServiceActorInfoContainer element = (ServiceActorInfoContainer) iter.next(); 320 if(!newServiceActorList.contains(element)){ 321 count++; 322 actor = element.getActor(); 323 if(actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC && actor.getType() == ServiceActor.CONSUMER){ 324 if(log.isInfoEnabled()){ 326 log.info("Removing service consumer "+actor); 327 } manta.getSingletonRepository().getVirtualTopicManager() 329 .removeConsumer((ServiceConsumer) actor); 330 }else{ 331 service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType()); 332 333 if (service != null) { 334 if (actor.getType()== ServiceActor.CONSUMER) { 335 if(log.isInfoEnabled()){ 336 log.info("Removing service consumer "+actor); 337 } service.removeConsumer((ServiceConsumer) actor); 339 } else if (actor.getType()== ServiceActor.PRODUCER) { 341 if(log.isInfoEnabled()){ 342 log.info("Removing service producer "+actor); 343 } service.removeProducer((ServiceProducer) actor); 345 }else{ 346 VirtualQueuesManager vqm = manta.getSingletonRepository().getVirtualQueuesManager(); 347 QueueMaster qm = vqm.getQueueMaster(service.getServiceName()); 348 if (qm != null && qm.getAgentName().equals(actor.getAgentName())) { 349 if(log.isInfoEnabled()){ 351 log.info("Removing service coordinator "+actor); 352 } vqm.setQueueMaster(service.getServiceName(), null); 354 } 355 } } } 358 } 359 } } 361 362 368 private void removeDurableActors(ByteableList newRemovedDurable, ByteableList oldRemovedDurable, String defaultDomainName) { 369 ServiceActor actor = null; 370 MantaService service = null; 371 if(newRemovedDurable != null){ 372 for (Iterator iter = newRemovedDurable.iterator(); iter.hasNext();) { 373 ServiceActorInfoContainer element = (ServiceActorInfoContainer) iter.next(); 374 if(!oldRemovedDurable.contains(element)){ 375 actor = element.getActor(); 376 service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType()); 377 378 if (service != null) { 379 380 if (actor.getType()== ServiceActor.CONSUMER) { 381 VirtualTopicManager topicManager =MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager(); 382 if(log.isInfoEnabled()){ 383 log.info("Removing durable service consumer "+actor); 384 } topicManager.removeDurableConsumer(service.getServiceName(),(ServiceConsumer) actor); 386 } 388 } } 390 391 } 393 } 394 395 } 396 397 403 private void addServiceActors(ByteableList serviceActorList, String domainName) { 404 ServiceActor actor = null; 405 MantaService service = null; 406 407 for (Iterator iter = serviceActorList.iterator(); iter.hasNext();) { 408 ServiceActorInfoContainer element = (ServiceActorInfoContainer) iter.next(); 409 actor = element.getActor(); 410 if((actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC)&& 411 (actor.getType()== ServiceActor.CONSUMER )){ 412 if(log.isInfoEnabled()){ 414 log.info("Discovered service consumer "+actor); 415 } 416 MantaAgent.getInstance().getSingletonRepository() 417 .getVirtualTopicManager().addConsumer((ServiceConsumer) actor); 418 }else{ 419 service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType()); 420 421 if (service != null) { 422 if (actor.getType()== ServiceActor.CONSUMER) { 423 if(log.isInfoEnabled()){ 424 log.info("Discovered service consumer "+actor); 425 } service.addConsumer((ServiceConsumer) actor); 427 428 } else if (actor.getType()== ServiceActor.PRODUCER) { 430 if(log.isInfoEnabled()){ 431 log.info("Discovered service producer "+actor); 432 } service.addProducer((ServiceProducer) actor); 434 }else{ 435 QueueMaster coordinator = (QueueMaster) actor; 436 coordinator.setValidUntil(SystemTime.currentTimeMillis()+5000); 437 if(log.isInfoEnabled()){ 438 log.info("Discovered service coordinator "+actor); 439 } manta.getSingletonRepository().getVirtualQueuesManager(). 441 setQueueMaster(service.getServiceName(), 442 (QueueMaster) coordinator); 443 444 } } 447 } 448 } } 455 private void addServiceActors(ByteableList serviceActorList, ByteableList oldServiceActorList, String defaultDomainName) { 456 ServiceActor actor = null; 457 MantaService service = null; 458 459 for (Iterator iter = serviceActorList.iterator(); iter.hasNext();) { 460 ServiceActorInfoContainer element = (ServiceActorInfoContainer) iter.next(); 461 actor = element.getActor(); 462 if((actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC)&& 463 (actor.getType()== ServiceActor.CONSUMER )){ 464 if( !oldServiceActorList.contains(element)){ 466 if(log.isInfoEnabled()){ 467 log.info("Discovered service consumer "+actor); 468 } MantaAgent.getInstance().getSingletonRepository() 470 .getVirtualTopicManager().addConsumer((ServiceConsumer) actor); 471 } 472 }else{ 473 service = MantaAgent.getInstance().getService(actor.getServiceName(), actor.getServiceType()); 474 475 if (service != null ) { 476 if (actor.getType()== ServiceActor.CONSUMER ) { 477 if( !oldServiceActorList.contains(element)){ 478 if(log.isInfoEnabled()){ 479 log.info("Discovered service consumer "+actor); 480 } service.addConsumer((ServiceConsumer) actor); 482 } 483 } else if (actor.getType()== ServiceActor.PRODUCER) { 485 if( !oldServiceActorList.contains(element)){ 486 if(log.isInfoEnabled()){ 487 log.info("Discovered service producer "+actor); 488 } service.addProducer((ServiceProducer) actor); 490 } 491 }else{ 492 VirtualQueuesManager vqm = manta.getSingletonRepository().getVirtualQueuesManager(); 493 QueueMaster current = vqm.getQueueMaster(service.getServiceName()); 494 if(current == null) { 496 if(log.isInfoEnabled()){ 497 log.info("Discovered service coordinator (no coordinator defined yet): "+actor); 498 log.info("Setting the newly discovered coordinator as queue coordinator: "+actor); 499 } QueueMaster coordinator = (QueueMaster) actor; 501 coordinator.setValidUntil(SystemTime.currentTimeMillis()+5000); 502 vqm.setQueueMaster(service.getServiceName(), coordinator); 503 } 504 else if (!current.equals(actor)) { 505 if(log.isInfoEnabled()) { 510 log.info("Discovered service coordinator (another coordinator is already defined): found: "+actor+" existing: "+current); 511 log.info("Setting the newly discovered coordinator as queue coordinator: "+actor); 512 } 513 QueueMaster coordinator = (QueueMaster) actor; 514 coordinator.setValidUntil(SystemTime.currentTimeMillis()+5000); 515 vqm.setQueueMaster(service.getServiceName(), coordinator); 516 517 if (current.getAgentName().equals(this.manta.getAgentName())) { 518 if(log.isInfoEnabled()) 519 log.info("Recalling current coordinator to resolve the conflict: "+current); 520 try { 521 this.manta.recallService(current); 522 } catch (MantaException e) { 523 if (log.isErrorEnabled()) 524 log.error("Error while recalling coordinator. "+e); 525 } 526 } 527 530 } 532 else{ 533 vqm.getQueueMaster(service.getServiceName()).setValidUntil(SystemTime.currentTimeMillis()+5000); 534 } 535 } } } 538 } } 540 541 542 548 private boolean addAgentTransportInfo(Set currentAgentTransportInfo, ByteableList newAgentTransportList, String agentName, String defaultDomainName) { 549 TransportInfo newInfo = null; 550 boolean result = false; 551 552 for (int i = 0; i < newAgentTransportList.size(); i++) { 553 newInfo = (TransportInfo) newAgentTransportList.get(i); 554 if (currentAgentTransportInfo == null || !currentAgentTransportInfo.contains(newInfo)) { 555 if(log.isInfoEnabled()){ 556 log.info("Discovered transport for peer "+agentName+", transport "+newInfo); 557 } worldModeler.addTransportInfoToAgent(defaultDomainName, agentName, newInfo); 559 result = true; 560 } 561 } return result; 563 } 565 566 567 private boolean removeAgentTransportInfo(Collection currentAgentTransportInfo, List newAgentTransportList, String agentName, String domainName) { 568 TransportInfo info = null; 569 boolean result = false; 570 ArrayList copy = new ArrayList (); 571 copy.addAll(currentAgentTransportInfo); 572 Iterator i = copy.iterator(); 573 574 while (i.hasNext()) { 575 info = (TransportInfo) i.next(); 576 if (!newAgentTransportList.contains(info) && info.getTransportInfoType() != TransportType.MWB) { 577 if(log.isInfoEnabled()){ 578 log.info("Removed transport for peer "+agentName+", transport "+info); 579 } worldModeler.removeTransportInfoFromAgent(domainName, agentName, info); 581 result = true; 582 } } return result; 585 } 587 588 589 } 591 | Popular Tags |