1 46 package org.mr.core.net; 47 48 import java.util.HashMap ; 49 import java.util.HashSet ; 50 import java.util.Iterator ; 51 import java.util.Map ; 52 import java.util.Set ; 53 54 import org.mr.MantaAgent; 55 import org.mr.core.configuration.ConfigManager; 56 import org.mr.core.util.ActiveObject; 57 import org.mr.core.util.Stage; 58 import org.mr.core.util.StageHandler; 59 import org.mr.core.util.StageParams; 60 import org.mr.core.util.TimeoutTimer; 61 import org.mr.core.util.Timeoutable; 62 63 78 public class AgentMonitorManager implements Timeoutable, StageHandler { 79 private Map monitors; 80 private Set transports; 81 private TimeoutTimer timer; 82 private Stage stage; 83 private int keepaliveInterval; 84 85 public AgentMonitorManager() { 86 this.monitors = new HashMap (); 87 this.transports = new HashSet (); 88 ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager(); 89 keepaliveInterval = config.getIntProperty("network.keepalive.interval", 1) * 1000; 90 this.timer = new TimeoutTimer("Agent_monitor",100, 1); this.timer.setName("AgentMonitorManager Timer"); 92 StageParams params = new StageParams(); 93 params.setBlocking(false); 94 params.setPersistent(false); 95 params.setStageName("AgentMonitor"); 96 params.setHandler(this); 97 params.setNumberOfStartThreads(1); 98 params.setMaxNumberOfThreads(10); 99 params.setStagePriority(0); 100 this.stage = new Stage(params); 101 } 103 112 public void addMonitor(String agent, Set newTransports, 113 AgentStateListener listener) { 114 final String fagent = agent; 115 final Set fnewTransports = newTransports; 116 final AgentStateListener flistener = listener; 117 this.stage.enqueue(new ActiveObject() { 118 public boolean call() { 119 doAddMonitor(fagent, fnewTransports, flistener); 120 return true; 121 } 122 }); 123 } 124 125 private void doAddMonitor(String agent, Set newTransports, 126 AgentStateListener listener) { 127 AgentMonitor monitor = (AgentMonitor) this.monitors.get(agent); 130 if (monitor == null) { 131 monitor = new AgentMonitor(agent, newTransports, this); 132 this.monitors.put(agent, monitor); 133 } 134 monitor.addListener(listener); 135 boolean startTimer = this.transports.isEmpty(); 136 if (newTransports != null) { 137 this.transports.addAll(newTransports); 138 } 139 if (startTimer && !this.transports.isEmpty()) { 140 startTimer(); 141 } 142 } 144 145 154 public void removeMonitor(String agent, Set toRemoveTransports, 155 AgentStateListener listener) { 156 final String fagent = agent; 157 final Set ftoRemoveTransports = toRemoveTransports; 158 final AgentStateListener flistener = listener; 159 this.stage.enqueue(new ActiveObject() { 160 public boolean call() { 161 doRemoveMonitor(fagent, ftoRemoveTransports, flistener); 162 return true; 163 } 164 }); 165 } 166 167 private void doRemoveMonitor(String agent, Set toRemoveTransports, 168 AgentStateListener listener) { 169 AgentMonitor monitor = (AgentMonitor) this.monitors.get(agent); 170 if (monitor != null) { 171 monitor.removeListener(listener); 172 if (!monitor.hasListeners()) { 173 monitor.shutdown(); 174 this.monitors.remove(agent); 175 if (toRemoveTransports != null) { 176 this.transports.removeAll(toRemoveTransports); 177 if (this.transports.isEmpty()) { 178 stopTimer(); 179 } 180 } 181 } 182 } 183 } 184 185 194 public void removeMonitor(String agent, Set toRemoveTransports) { 195 final String fagent = agent; 196 final Set ftoRemoveTransports = toRemoveTransports; 197 this.stage.enqueue(new ActiveObject() { 198 public boolean call() { 199 doRemoveMonitor(fagent, ftoRemoveTransports); 200 return true; 201 } 202 }); 203 } 204 205 private void doRemoveMonitor(String agent, Set toRemoveTransports) { 206 AgentMonitor monitor = (AgentMonitor) this.monitors.get(agent); 208 if (monitor != null) { 209 monitor.shutdown(); 210 if (toRemoveTransports != null) { 211 this.transports.removeAll(toRemoveTransports); 212 if (this.transports.isEmpty()) { 213 stopTimer(); 214 } 215 } 216 } 217 } 218 219 224 public void addTransport(String agent, Transport t) { 225 final String fagent = agent; 226 final Transport ft = t; 227 this.stage.enqueue(new ActiveObject() { 228 public boolean call() { 229 doAddTransport(fagent, ft); 230 return true; 231 } 232 }); 233 } 234 235 private void doAddTransport(String agent, Transport t) { 236 AgentMonitor monitor = (AgentMonitor) this.monitors.get(agent); 237 if (monitor != null) { 238 monitor.addTransport(t); 239 this.transports.add(t); 240 if (this.transports.size() == 1) { 241 startTimer(); 242 } 243 } 244 } 245 246 251 public void removeTransport(String agent, Transport t) { 252 final String fagent = agent; 253 final Transport ft = t; 254 this.stage.enqueue(new ActiveObject() { 255 public boolean call() { 256 doRemoveTransport(fagent, ft); 257 return true; 258 } 259 }); 260 } 261 262 private void doRemoveTransport(String agent, Transport t) { 263 AgentMonitor monitor = (AgentMonitor) this.monitors.get(agent); 264 if (monitor != null) { 265 monitor.removeTransport(t); 266 this.transports.remove(t); 267 if (this.transports.isEmpty()) { 268 stopTimer(); 269 } 270 } 271 } 272 273 279 public int getAgentState(String agent) { 280 AgentMonitor monitor = (AgentMonitor) this.monitors.get(agent); 281 if (monitor != null) { 282 return monitor.getState(); 283 } else { 284 return AgentStateListener.AGENT_STATE_NOT_MONITORING; 285 } 286 } 288 private void startTimer() { 290 this.timer.addTimeout(this, this, keepaliveInterval); 291 } 292 293 private void stopTimer() { 295 this.timer.removeTimeout(this); 296 } 297 298 public void sendEvent(String agent, int state, 299 AgentStateListener listener) 300 { 301 final String fagent = agent; 302 final int fstate = state; 303 final AgentStateListener flistener = listener; 304 this.stage.enqueue(new ActiveObject() { 305 public boolean call() { 306 doSendEvent(fagent, fstate, flistener); 307 return true; 308 } 309 }); 310 } 311 312 private void doSendEvent(String agent, int state, 313 AgentStateListener listener) 314 { 315 listener.agentStateChanged(agent, state); 316 } 317 318 public void timeout(Object event) { 320 final Object fevent = event; 321 this.stage.enqueue(new ActiveObject() { 322 public boolean call() { 323 doTimeout(fevent); 324 return true; 325 } 326 }); 327 } 328 329 private void doTimeout(Object event) { 330 Iterator i = this.transports.iterator(); 331 while (i.hasNext()) { 332 Transport t = (Transport) i.next(); 333 t.keepalive(); 334 } 335 this.timer.addTimeout(this, this, keepaliveInterval); 336 } 337 338 public boolean handle(Object o) { 339 ActiveObject ao = (ActiveObject) o; 340 ao.call(); 341 342 return true; 343 } 344 } | Popular Tags |