1 46 47 package org.mr.indexing; 48 49 import java.util.Iterator ; 50 import java.util.List ; 51 import java.util.Set ; 52 53 import org.apache.commons.logging.Log; 54 import org.apache.commons.logging.LogFactory; 55 import org.mr.MantaAgent; 56 import org.mr.core.net.TransportInfo; 57 import org.mr.core.protocol.MantaBusMessage; 58 import org.mr.indexing.messages.AgentTransportsChanged; 59 import org.mr.indexing.messages.MWBMessageConsts; 60 import org.mr.indexing.messages.ServiceParticipationChanged; 61 import org.mr.kernel.services.MantaService; 62 import org.mr.kernel.services.ServiceActor; 63 import org.mr.kernel.services.ServiceConsumer; 64 import org.mr.kernel.services.ServiceProducer; 65 import org.mr.kernel.services.queues.QueueMaster; 66 import org.mr.kernel.services.queues.VirtualQueuesManager; 67 import org.mr.kernel.services.topics.VirtualTopicManager; 68 import org.mr.kernel.world.WorldModeler; 69 70 79 public class WBHandler { 80 private Log log; 81 82 public WBHandler() { 83 this.log = LogFactory.getLog("WBHandler"); 84 } 86 89 public void messageArrived(MantaBusMessage message, String messageType) { 90 if (this.log.isDebugEnabled()) { 91 this.log.debug("Handling " + message.getPayload().toString()); 92 } 93 94 if (messageType.equals(AgentTransportsChanged.getTypeString())) { 95 AgentTransportsChanged atc = 96 (AgentTransportsChanged) message.getPayload(); 97 agentTransportsChanged(atc); 98 } else if (messageType.equals(ServiceParticipationChanged.getTypeString())) { 99 ServiceParticipationChanged spc = 100 (ServiceParticipationChanged) message.getPayload(); 101 serviceParticipationChanged(spc); 102 } else { 103 if (this.log.isWarnEnabled()) { 104 this.log.warn("Ignoring IRS message with unknown type: " + 105 messageType); 106 } 107 } 108 } 109 110 private void agentTransportsChanged(AgentTransportsChanged atc) { 111 MantaAgent agent = MantaAgent.getInstance(); 112 WorldModeler world = agent.getSingletonRepository().getWorldModeler(); 113 String agentName = atc.getAgentName(); 114 String domainName = atc.getDomainName(); 115 List added = atc.getAddedTransports(); 116 List removed = atc.getRemovedTransports(); 117 boolean saveWorld = false; 118 boolean actionPerformed = false; 119 Iterator i; 120 121 i = added.iterator(); 122 while (i.hasNext()) { 123 TransportInfo info = (TransportInfo) i.next(); 124 actionPerformed = world.addTransportInfoToAgent(domainName, 125 agentName, info); 126 saveWorld = saveWorld || actionPerformed; 127 if (actionPerformed && this.log.isInfoEnabled()) { 128 this.log.info("Added transport for " + agentName + ": " + info); 129 } 130 } 131 i = removed.iterator(); 132 while (i.hasNext()) { 133 TransportInfo info = (TransportInfo) i.next(); 134 actionPerformed = 135 world.removeTransportInfoFromAgent(domainName, agentName, info); 136 saveWorld = saveWorld || actionPerformed; 137 if (actionPerformed && this.log.isInfoEnabled()) { 138 this.log.info("Removed transport for " + agentName + ": " + 139 info); 140 } 141 } 142 143 if (saveWorld) { 144 153 } 154 } 155 156 167 private void serviceParticipationChanged(ServiceParticipationChanged spc) { 168 MantaAgent agent = MantaAgent.getInstance(); 169 WorldModeler world = agent.getSingletonRepository().getWorldModeler(); 170 String domainName = spc.getDomainName(); 171 String serviceName = spc.getServiceName(); 172 String serviceType = spc.getServiceType(); 173 byte operation = spc.getOperation(); 175 MantaService service = null; 176 177 if (serviceType.equals(ServiceParticipationChanged.QUEUE)) { 178 service = agent.getService(serviceName,MantaService.SERVICE_TYPE_QUEUE ); 179 180 } else if (serviceType.equals(ServiceParticipationChanged.TOPIC)) { 181 service = agent.getService(serviceName,MantaService.SERVICE_TYPE_TOPIC); 182 } 183 184 if (service != null) { 185 Set actors = spc.getActors(); 186 Iterator i = actors.iterator(); 187 188 while (i.hasNext()) { 189 ServiceActor actor = (ServiceActor) i.next(); 190 if (operation == MWBMessageConsts.OP_ADD) { 191 String agentName = actor.getAgentName(); 193 List transports = spc.getTransports(actor); 194 Iterator ii = transports.iterator(); 195 boolean actionPerformed; 196 boolean saveWorld = false; 197 while (ii.hasNext()) { 198 TransportInfo info = (TransportInfo) ii.next(); 199 actionPerformed = 200 world.addTransportInfoToAgent(domainName, agentName, 201 info); 202 saveWorld = saveWorld || actionPerformed; 203 if (actionPerformed && this.log.isInfoEnabled()) { 204 this.log.info("Added transport for " + agentName + 205 ": " + info); 206 } 207 } 208 if (saveWorld) { 209 219 } 220 221 if (this.log.isInfoEnabled()) { 223 this.log.info("Adding actor " + actor); 224 } 225 226 if((actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC)&& 227 (actor.getType()== ServiceActor.CONSUMER )){ 228 MantaAgent.getInstance().getSingletonRepository() 230 .getVirtualTopicManager().addConsumer((ServiceConsumer) actor); 231 }else if (actor.getType() == ServiceActor.CONSUMER) { 232 service.addConsumer((ServiceConsumer) actor); 233 } else if (actor.getType() == ServiceConsumer.PRODUCER) { 234 service.addProducer((ServiceProducer) actor); 235 } else { 236 QueueMaster master = (QueueMaster) actor; 237 master.setValidUntil(Long.MAX_VALUE); 238 VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 239 vqm.setQueueMaster(serviceName, master); 240 } 241 } else if (operation == MWBMessageConsts.OP_REMOVE) { 242 if (this.log.isInfoEnabled()) { 244 this.log.info("Removing actor " + actor); 245 } 246 if (actor.getType() == ServiceActor.CONSUMER) { 247 if(actor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC){ 248 MantaAgent.getInstance().getSingletonRepository() 249 .getVirtualTopicManager().removeConsumer((ServiceConsumer) actor); 250 }else{ 251 service.removeConsumer((ServiceConsumer) actor); 252 } 253 } else if (actor.getType() == ServiceConsumer.PRODUCER) { 254 service.removeProducer((ServiceProducer) actor); 255 } else { 256 VirtualQueuesManager vqm = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 257 vqm.setQueueMaster(serviceName,null); 258 } 259 } else { if (this.log.isInfoEnabled()) { 261 this.log.info("Removing durable actor " + actor); 262 } 263 if (actor.getType() == ServiceActor.CONSUMER) { 264 VirtualTopicManager topicManager =MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager(); 265 topicManager. 266 removeDurableConsumer(serviceName,(ServiceConsumer) actor); 267 } 268 } 269 } 270 } 271 } 272 } | Popular Tags |