| 1 46 package org.mr.plugins.coordinators; 47 48 import java.util.ArrayList ; 49 import java.util.Collection ; 50 import java.util.Iterator ; 51 import java.util.Set ; 52 53 import org.apache.commons.logging.Log; 54 import org.apache.commons.logging.LogFactory; 55 import org.mr.MantaAgent; 56 import org.mr.api.jms.MantaConnection; 57 import org.mr.core.configuration.ConfigurationElement; 58 import org.mr.core.util.SystemTime; 59 import org.mr.kernel.services.MantaService; 60 import org.mr.kernel.services.queues.QueueMaster; 61 import org.mr.kernel.world.WorldModeler; 62 63 70 public class DynamicQueuesCoordinator extends Thread { 71 public static final String COORDINATED_QUEUES_KEY = "plug-ins.queues-coordinator.queue"; 72 public static final String EXCLUDE_QUEUES_KEY = "plug-ins.queues-coordinator.exclude_prefix"; 73 74 public static Log log; 75 boolean go = true; 76 public static MantaAgent agent; 77 public static WorldModeler world; 78 79 public static ArrayList excludeList = new ArrayList (); 80 81 public DynamicQueuesCoordinator(){ 82 log=LogFactory.getLog("DynamicQueuesCoordinator"); 83 agent = QueuesCoordinator.agent; 84 world = agent.getSingletonRepository().getWorldModeler(); 85 this.setName("DynamicQueuesCoordinator"); 86 ArrayList queueList= MantaAgent.getInstance() 88 .getSingletonRepository().getConfigManager() 89 .getConfigurationElements(EXCLUDE_QUEUES_KEY); 90 91 if(queueList!= null && queueList.size()>0 ){ 93 for (int i = 0; i < queueList.size(); i++) { 94 excludeList.add(((ConfigurationElement)queueList.get(i)).getValue()) ; 95 } 96 } 97 } 98 101 public void run(){ 102 103 while(go){ 104 long rand = System.currentTimeMillis(); 106 rand =rand%3000; 107 rand = rand +2000; 108 try { 109 sleep(rand); 110 Set servicesSet =world.getServices(world.getDefaultDomainName()); 111 if(servicesSet != null){ 112 Iterator services =servicesSet.iterator(); 113 int i = 0; 114 while(services.hasNext()){ 115 i++; 116 MantaService service = (MantaService) services.next(); 117 if(service.getServiceType() ==MantaService.SERVICE_TYPE_QUEUE ){ 118 QueueMaster coordinator= agent.getSingletonRepository().getVirtualQueuesManager().getQueueMaster(service.getServiceName()); 120 if(coordinator == null || (coordinator.getValidUntil()<SystemTime.currentTimeMillis())){ 121 Collection producers = service.getProducersByAgentId(agent.getAgentName()); 123 if(producers != null && producers.size() >0 && !isExcluded(service.getServiceName())){ 124 QueueMaster myCoordinator = new QueueMaster(agent.getAgentName(),service.getServiceName() ); 125 if (log.isInfoEnabled()) { 126 log.info("Starting to coordinate queue "+myCoordinator); 127 } 128 agent.advertiseService(myCoordinator); 129 QueuesCoordinator.queueToCoordinatorMap.put(service ,myCoordinator ); 130 } 131 } } } } } catch (Exception e) { 136 if (log.isErrorEnabled()) { 137 log.error("Error in DynamicQueuesCoordinator",e); 138 } 139 } } } 143 144 public static boolean isExcluded(String queueName){ 145 if(queueName.startsWith(MantaConnection.TMP_DESTINATION_PREFIX)){ 146 return true; 147 } 148 for (int i = 0; i < excludeList.size(); i++) { 149 String exclude = (String ) excludeList.get(i); 150 if(exclude != null && queueName.startsWith(exclude)){ 151 return true; 152 } 153 } 154 return false; 155 } 156 157 } 158 | Popular Tags |