1 46 47 package org.mr.indexing; 48 49 import java.util.HashSet ; 50 import java.util.Iterator ; 51 import java.util.Set ; 52 import java.util.Collection ; 53 54 import org.apache.commons.logging.Log; 55 import org.apache.commons.logging.LogFactory; 56 import org.mr.MantaAgent; 57 import org.mr.MantaAgentConstants; 58 import org.mr.core.protocol.DeadEndRecipient; 59 import org.mr.core.protocol.MantaBusMessage; 60 import org.mr.core.protocol.MantaBusMessageConsts; 61 import org.mr.core.util.TimeoutTimer; 62 import org.mr.core.util.Timeoutable; 63 import org.mr.core.net.AgentStateListener; 64 import org.mr.core.net.NetworkManager; 65 import org.mr.core.net.LocalTransport; 66 import org.mr.core.net.TransportInfo; 67 import org.mr.core.net.SimpleMantaAddress; 68 import org.mr.indexing.messages.AgentRoleChanged; 69 import org.mr.indexing.messages.AgentTransportsChanged; 70 import org.mr.indexing.messages.MWBMessageConsts; 71 import org.mr.kernel.services.ServiceActor; 72 import org.mr.kernel.world.WorldModeler; 73 74 80 public class WBAdvertiser implements AgentStateListener, Timeoutable { 81 private Log log; 82 private Set actors; 83 private TimeoutTimer timer; 84 private int leaseTimeSec; 85 private boolean advertisedTransports; 86 private boolean connectedOnce; 87 88 public WBAdvertiser(int leaseTimeSec) { 89 this.log = LogFactory.getLog("WBAdvertiser"); 90 this.actors = new HashSet (); 91 this.timer = new TimeoutTimer("WBAdvertiser",1000, 1); this.leaseTimeSec = leaseTimeSec; 93 this.advertisedTransports = false; 94 this.connectedOnce = false; 95 if (this.log.isInfoEnabled()) { 96 this.log.info("Advertising using WBLink"); 97 } 98 } 100 public void advertiseService(ServiceActor actor, MantaAgent agent) { 101 if (!advertisedTransports) { 102 advertiseTransports(); 103 } 104 if (this.log.isInfoEnabled()) { 105 this.log.info("Advertising " + actor + " using WBLink"); 106 } 107 synchronized (this.actors) { 108 boolean startAdvertising = actors.isEmpty(); 109 this.actors.add(actor); 110 if (startAdvertising) { 111 startTimer(); 112 } 113 } 114 sendMWBMessage(actor, MWBMessageConsts.OP_ADD, agent); 115 } 116 117 public void recallService(ServiceActor actor, MantaAgent agent) { 118 if (!advertisedTransports) { 119 advertiseTransports(); 120 } 121 if (this.log.isInfoEnabled()) { 122 this.log.info("Recalling " + actor + " using WBLink"); 123 } 124 synchronized (this.actors) { 125 this.actors.remove(actor); 126 if (this.actors.isEmpty()) { 127 stopTimer(); 128 } 129 } 130 sendMWBMessage(actor, MWBMessageConsts.OP_REMOVE, agent); 131 } 132 133 public void recallDurableSubscription(ServiceActor actor, 134 MantaAgent agent) { 135 if (!advertisedTransports) { 136 advertiseTransports(); 137 } 138 if (this.log.isInfoEnabled()) { 139 this.log.info("Recalling durable " + actor + " using WBLink"); 140 } 141 synchronized (this.actors) { 142 this.actors.remove(actor); 143 if (this.actors.isEmpty()) { 144 stopTimer(); 145 } 146 } 147 sendMWBMessage(actor, MWBMessageConsts.OP_REMOVE_DURABLE, agent); 148 } 149 150 public void timeout(Object event) { 151 advertiseActors(); 152 startTimer(); 153 } 154 155 private void startTimer() { 156 this.timer.addTimeout(this, this, (long) leaseTimeSec * 500); 157 } 158 159 private void stopTimer() { 160 this.timer.removeTimeout(this); 161 } 162 163 private void advertiseActors() { 164 synchronized (this.actors) { 165 Iterator i = this.actors.iterator(); 166 while (i.hasNext()) { 167 ServiceActor actor = (ServiceActor) i.next(); 168 sendMWBMessage(actor, MWBMessageConsts.OP_ADD, 169 MantaAgent.getInstance()); 170 } 171 } 172 } 173 174 private void advertiseTransports() { 175 MantaAgent agent = MantaAgent.getInstance(); 176 NetworkManager network = 177 agent.getSingletonRepository().getNetworkManager(); 178 String agentName = agent.getAgentName(); 179 String domainName = agent.getDomainName(); 180 Collection transports = network.getLocalTransports(); 181 182 if (transports != null) { 183 Set correctTransports = new HashSet (); 185 Iterator i = transports.iterator(); 186 while (i.hasNext()) { 187 TransportInfo info = ((LocalTransport) i.next()).getInfo(); 188 if (info != null) { 189 if (info.getIp().getHostAddress().equals("0.0.0.0")) { 190 String validLocal = 191 TransportInfo.getValidLocalAddress(); 192 info = new TransportInfo(validLocal, info.getPort(), 193 info.getTransportInfoType(). 194 toString()); 195 } 196 correctTransports.add(info); 197 } 198 } 199 200 AgentTransportsChanged atc = new AgentTransportsChanged(); 201 atc.setAgentName(agentName); 202 atc.setDomainName(domainName); 203 atc.setAddedTransports(correctTransports); 204 atc.setCleanAll(true); 205 MantaBusMessage message = MantaBusMessage.getInstance(); 206 207 DeadEndRecipient resp = 208 DeadEndRecipient.createDeadEndRecipient("mwb", domainName); 209 message.setPayload(atc); 210 message.setRecipient(resp); 211 message.addHeader(MWBMessageConsts.MWB_TYPE, 212 AgentTransportsChanged.getTypeString()); 213 message.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL); 214 message.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT); 215 216 217 agent.send(message, new SimpleMantaAddress(agentName, domainName)); 218 if (this.log.isDebugEnabled()) { 219 this.log.debug("Sent " + atc); 220 } 221 this.advertisedTransports = true; 222 } 223 } 224 225 private void sendMWBMessage(ServiceActor actor, byte operation, 226 MantaAgent agent) 227 { 228 WorldModeler world = agent.getSingletonRepository().getWorldModeler(); 229 String domainName = world.getDefaultDomainName(); 230 231 AgentRoleChanged arc = new AgentRoleChanged(); 232 233 arc.setServiceActor(actor); 234 arc.setOperation(operation); 236 arc.setDomainName(domainName); 237 arc.setLease(this.leaseTimeSec); 238 239 MantaBusMessage message = MantaBusMessage.getInstance(); 240 241 DeadEndRecipient resp = 242 DeadEndRecipient.createDeadEndRecipient("mwb", domainName); 243 244 message.setPayload(arc); 247 message.setRecipient(resp); 248 message.addHeader(MWBMessageConsts.MWB_TYPE, 249 AgentRoleChanged.getTypeString()); 250 message.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL); 251 message.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT); 252 253 agent.send(message, arc.getServiceActor()); 254 } 255 256 259 public void agentStateChanged(String agent, int state) { 260 if (state == AgentStateListener.AGENT_STATE_DOWN && 261 this.connectedOnce) { 262 if (this.log.isWarnEnabled()) { 263 this.log.warn("Connection to WB is down. No role advertising."); 264 } 265 } else if (state == AgentStateListener.AGENT_STATE_UP) { 266 this.connectedOnce = true; 267 if (this.log.isInfoEnabled()) { 268 this.log.info("Connection to WB is up"); 269 } 270 advertiseTransports(); 271 } 272 } 273 } | Popular Tags |