1 46 51 package org.mr.plugins.coordinators; 52 53 import java.util.ArrayList ; 54 import java.util.HashMap ; 55 56 import org.mr.MantaAgent; 57 import org.apache.commons.logging.Log; 58 import org.apache.commons.logging.LogFactory; 59 import org.mr.core.configuration.ConfigurationElement; 60 import org.mr.kernel.Plugin; 61 import org.mr.kernel.services.MantaService; 62 import org.mr.kernel.services.queues.QueueMaster; 63 64 70 public class QueuesCoordinator implements Plugin { 71 72 public static final String COORDINATED_QUEUES_KEY = "plug-ins.queues-coordinator.queue"; 73 public static final String DYNAMICLY_COORDINATED = "DYNAMICALLY_COORDINATED"; 74 public static MantaAgent agent; 75 public static String [] myCoordinatedQueuesName = null; 76 public static HashMap queueToCoordinatorMap = new HashMap (); 77 public static Log log; 78 DynamicQueuesCoordinator dqc; 79 80 81 public QueuesCoordinator(){ 82 agent= MantaAgent.getInstance(); 83 ArrayList queueList= MantaAgent.getInstance() 84 .getSingletonRepository().getConfigManager() 85 .getConfigurationElements(COORDINATED_QUEUES_KEY); 86 87 if(queueList!= null && queueList.size()>0 ){ 89 myCoordinatedQueuesName = new String [queueList.size()]; 90 for (int i = 0; i < queueList.size(); i++) { 91 myCoordinatedQueuesName[i]= ((ConfigurationElement)queueList.get(i)).getValue(); 92 } 93 } 94 log=LogFactory.getLog("QueuesCoordinator"); 95 } 96 99 public String getName() { 100 101 return "QueuesCoordinator"; 102 } 103 104 107 public float getVersion() { 108 109 return 1; 110 } 111 112 116 public void start() { 117 if(QueuesCoordinator.myCoordinatedQueuesName == null ||QueuesCoordinator.myCoordinatedQueuesName.length ==0 ){ 118 if(log.isDebugEnabled()) { 119 log.debug("coordinated_queues is null or empty -no Queues to coordinated"); 120 } 121 return; 122 } 123 for (int i = 0; i < QueuesCoordinator.myCoordinatedQueuesName.length; i++) { 125 String queueName = QueuesCoordinator.myCoordinatedQueuesName[i]; 126 if(queueName ==null ||queueName.length()==0 ){ 127 if (log.isInfoEnabled()) { 129 log.info("Ignoring queue with empty name, possible configuration error in queues-coordinator."); 130 } 131 continue; 132 } 133 queueName = queueName.trim(); 134 myCoordinatedQueuesName[i] = queueName; 135 136 try{ 137 if(queueName.equals(DYNAMICLY_COORDINATED)){ 138 if (log.isInfoEnabled()) { 139 log.info("Starting DynamicQueuesCoordinator"); 140 } 141 dqc = new DynamicQueuesCoordinator(); 142 dqc.start(); 143 continue; 144 } 145 if (log.isDebugEnabled()) { 146 log.debug("Starting to coordinate queue "+queueName); 147 } 148 MantaService queue = agent.getService(queueName, MantaService.SERVICE_TYPE_QUEUE); 149 QueueMaster coordinator = new QueueMaster(agent.getAgentName(),queueName ); 150 agent.advertiseService(coordinator); 151 queueToCoordinatorMap.put(queue ,coordinator ); 152 }catch(Exception e){ 153 if (log.isErrorEnabled()) { 154 log.error("Error in starting queue coordination "+queueName,e); 155 } 156 } 157 } } 159 160 163 public void stop() { 164 if(myCoordinatedQueuesName == null ||myCoordinatedQueuesName.length ==0 ){ 165 return; 167 } 168 for (int i = 0; i < myCoordinatedQueuesName.length; i++) { 170 String queueName = myCoordinatedQueuesName[i]; 171 try{ 172 if (log.isDebugEnabled()) { 173 log.debug("Recalling queue coordinator "+queueName); 174 } 175 MantaService queue = agent.getService(queueName, MantaService.SERVICE_TYPE_QUEUE); 176 QueueMaster coordinator =(QueueMaster)queueToCoordinatorMap.get(queue); 177 agent.recallService(coordinator); 178 queueToCoordinatorMap.remove(queue); 179 }catch(Exception e){ 180 if (log.isErrorEnabled()) { 181 log.error("Error in recalling queue coordinator "+queueName,e); 182 } 183 } 184 } } 186 } 187 | Popular Tags |