1 9 package org.jboss.remoting.detection; 10 11 12 import java.util.ArrayList ; 13 import java.util.HashMap ; 14 import java.util.HashSet ; 15 import java.util.Iterator ; 16 import java.util.List ; 17 import java.util.Map ; 18 import java.util.Set ; 19 import java.util.Timer ; 20 import java.util.TimerTask ; 21 import javax.management.MBeanServer ; 22 import javax.management.ObjectName ; 23 import org.jboss.logging.Logger; 24 import org.jboss.remoting.ConnectionValidator; 25 import org.jboss.remoting.InvokerLocator; 26 import org.jboss.remoting.InvokerRegistry; 27 import org.jboss.remoting.ServerInvoker; 28 import org.jboss.remoting.ident.Identity; 29 import org.jboss.remoting.network.NetworkRegistryFinder; 30 import org.jboss.remoting.transport.ClientInvoker; 31 import org.w3c.dom.Element ; 32 import org.w3c.dom.Node ; 33 import org.w3c.dom.NodeList ; 34 35 36 43 public abstract class AbstractDetector implements Detector, AbstractDetectorMBean 44 { 45 private long defaultTimeDelay = 5000; 46 private long heartbeatTimeDelay = 1000; 47 protected final Logger log = Logger.getLogger(getClass()); 48 protected MBeanServer mbeanserver; 49 protected ObjectName objectName; 50 protected ObjectName registryObjectName; 51 52 private Identity myself; 53 private Timer heartbeatTimer; 54 private Timer failureTimer; 55 private Map servers = new HashMap (); 56 private Element xml; 57 private Set domains = new HashSet (); 58 59 public AbstractDetector() 60 { 61 } 62 63 69 public void setHeartbeatTimeDelay(long heartbeatTimeDelay) 70 { 71 if(heartbeatTimeDelay > 0 && heartbeatTimeDelay < defaultTimeDelay) 72 { 73 this.heartbeatTimeDelay = heartbeatTimeDelay; 74 } 75 else 76 { 77 throw new IllegalArgumentException ("Can not set heartbeat time delay (" + heartbeatTimeDelay + ") to a negative number or " + 78 "to a number greater than the default time delay (" + defaultTimeDelay + ")."); 79 } 80 } 81 82 87 public long getHeartbeatTimeDelay() 88 { 89 return heartbeatTimeDelay; 90 } 91 92 99 public void setDefaultTimeDelay(long defaultTimeDelay) 100 { 101 if(defaultTimeDelay >= heartbeatTimeDelay) 102 { 103 this.defaultTimeDelay = defaultTimeDelay; 104 } 105 else 106 { 107 throw new IllegalArgumentException ("Can not set the default time delay (" + defaultTimeDelay + ") to be less" + 108 " than that of the heartbeat time delay (" + heartbeatTimeDelay + ")."); 109 } 110 } 111 112 116 public long getDefaultTimeDelay() 117 { 118 return defaultTimeDelay; 119 } 120 121 127 public Detection createDetection() 128 { 129 Detection detection = null; 130 131 ServerInvoker invokers[] = InvokerRegistry.getServerInvokers(); 132 if(invokers == null || invokers.length <= 0) 133 { 134 return detection; 135 } 136 List l = new ArrayList (invokers.length); 137 for(int c = 0; c < invokers.length; c++) 138 { 139 if(invokers[c].isStarted()) 140 { 141 ServerInvokerMetadata serverInvoker = new ServerInvokerMetadata(invokers[c].getLocator(), 142 invokers[c].getSupportedSubsystems()); 143 l.add(serverInvoker); 144 } 145 } 146 if(l.isEmpty()) 147 { 148 return detection; 149 } 150 ServerInvokerMetadata metadata[] = (ServerInvokerMetadata[]) l.toArray(new ServerInvokerMetadata[l.size()]); 151 detection = new Detection(Identity.get(mbeanserver), metadata); 152 return detection; 153 } 154 155 160 public void start() throws Exception 161 { 162 myself = Identity.get(mbeanserver); 164 165 if(domains.isEmpty() && xml == null) 167 { 168 domains.add(myself.getDomain()); 169 } 170 171 registryObjectName = NetworkRegistryFinder.find(mbeanserver); 173 if(registryObjectName == null) 174 { 175 log.warn("Detector: " + getClass().getName() + " could not be loaded because the NetworkRegistry is not registered"); 176 log.warn("This means that only the broadcasting of detection messages will be functional and will not be able to discover other servers."); 177 } 178 179 startPinger(getPingerDelay(), getPingerPeriod()); 180 startHeartbeat(getHeartbeatDelay(), getHeartbeatPeriod()); 181 } 182 183 189 protected long getPingerDelay() 190 { 191 return 5000; 192 } 193 194 200 protected long getPingerPeriod() 201 { 202 return 1500; 203 } 204 205 211 protected void startPinger(long delay, long period) 212 { 213 failureTimer = new Timer (false); 214 failureTimer.schedule(new FailureDetector(), delay, period); 215 } 216 217 220 protected void stopPinger() 221 { 222 if(failureTimer != null) 223 { 224 failureTimer.cancel(); 225 failureTimer = null; 226 } 227 } 228 229 234 public void stop() throws Exception 235 { 236 stopPinger(); 237 stopHeartbeat(); 238 stopPinger(); 239 } 240 241 public void postDeregister() 242 { 243 } 244 245 public void postRegister(Boolean aBoolean) 246 { 247 } 248 249 public void preDeregister() throws Exception 250 { 251 } 252 253 public ObjectName preRegister(MBeanServer mBeanServer, ObjectName objectName) throws Exception 254 { 255 this.mbeanserver = mBeanServer; 256 this.objectName = objectName; 257 return objectName; 258 } 259 260 267 public void setConfiguration(Element xml) 268 throws Exception 269 { 270 this.xml = xml; 271 272 if(xml != null) 274 { 275 domains.clear(); 277 278 NodeList domainNodes = xml.getElementsByTagName("domain"); 279 if(domainNodes == null || domainNodes.getLength() <= 0) 280 { 281 log.debug("No domains specified. Will accept all domains."); 283 } 284 int len = domainNodes.getLength(); 285 for(int c = 0; c < len; c++) 286 { 287 Node node = domainNodes.item(c); 288 String domain = node.getFirstChild().getNodeValue(); 289 domains.add(domain); 290 log.debug("Added domain " + domain + " to detector list."); 291 } 292 } 293 } 294 295 301 public Element getConfiguration() 302 { 303 return xml; 304 } 305 306 308 314 protected void startHeartbeat(long delay, long period) 315 { 316 if(heartbeatTimer == null) 317 { 318 heartbeatTimer = new Timer (false); 319 } 320 heartbeatTimer.schedule(new Heartbeat(), delay, period); 321 } 322 323 326 protected void stopHeartbeat() 327 { 328 if(heartbeatTimer != null) 329 { 330 try 331 { 332 heartbeatTimer.cancel(); 333 } 334 catch(Exception eg) 335 { 336 } 337 heartbeatTimer = null; 338 } 339 } 340 341 347 protected long getHeartbeatDelay() 348 { 349 return 0; 350 } 351 352 358 protected long getHeartbeatPeriod() 359 { 360 return heartbeatTimeDelay; 361 } 362 363 367 protected abstract void heartbeat(); 368 369 374 protected void detect(Detection detection) 375 { 376 if(log.isTraceEnabled()) 377 { 378 log.trace("Detection message received."); 379 log.trace("Id = " + detection.getIdentity().getInstanceId()); 380 log.trace("isRemoteDetection() = " + isRemoteDetection(detection)); 381 } 382 if(isRemoteDetection(detection)) 384 { 385 try 386 { 387 boolean found = false; 388 Server server = null; 389 390 synchronized(servers) 391 { 392 server = (Server ) servers.get(detection); 393 found = server != null; 394 if(!found) 395 { 396 servers.put(detection, (server = new Server (detection))); 398 } 399 else 400 { 401 server.lastDetection = System.currentTimeMillis(); 402 } 403 } 404 if(found == false) 405 { 406 if(registryObjectName != null) 407 { 408 log.debug("detected NEW server: " + detection.getIdentity()); 409 mbeanserver.invoke(registryObjectName, "addServer", new Object []{detection.getIdentity(), 410 detection.getServerInvokers()}, 411 new String []{Identity.class.getName(), ServerInvokerMetadata[].class.getName()}); 412 } 413 } 414 else 415 { 416 if(server.changed(detection)) 417 { 418 server.rehash(detection); 420 if(registryObjectName != null) 421 { 422 if(log.isTraceEnabled()) 423 { 424 log.trace("detected UPDATE for server: " + detection.getIdentity()); 425 } 426 mbeanserver.invoke(registryObjectName, "updateServer", new Object []{detection.getIdentity(), 427 detection.getServerInvokers()}, 428 new String []{Identity.class.getName(), ServerInvokerMetadata[].class.getName()}); 429 } 430 } 431 } 432 } 433 catch(javax.management.InstanceNotFoundException inf) 434 { 435 return; 436 } 437 catch(Exception e) 438 { 439 log.error("Error during detection of: " + detection, e); 440 } 441 } 442 else if(log.isTraceEnabled()) 443 { 444 log.trace("detection from myself - ignored"); 445 } 446 } 447 448 protected boolean isRemoteDetection(Detection detection) 449 { 450 String domain = detection.getIdentity().getDomain(); 451 return (domains.isEmpty() || domains.contains(domain)) && 454 myself.isSameJVM(detection.getIdentity()) == false; 455 } 456 457 protected boolean checkInvokerServer(Detection detection, ClassLoader cl) 458 { 459 boolean ok = false; 460 InvokerLocator il[] = detection.getLocators(); 461 for(int c = 0; c < il.length; c++) 462 { 463 try 464 { 465 ClientInvoker ci = InvokerRegistry.createClientInvoker(il[c]); 466 467 if(ci.isConnected() == false) 468 { 469 ci.connect(); 470 } 471 472 boolean isValid = ConnectionValidator.checkConnection(ci); 473 if(isValid) 474 { 475 ok = true; 477 break; 478 } 479 480 } 481 catch(Throwable ig) 482 { 483 log.debug("failed calling ping on " + detection, ig); 484 InvokerRegistry.destroyClientInvoker(il[c]); 486 break; 489 } 490 } 491 if(ok == false) 492 { 493 try 495 { 496 if(registryObjectName != null) 497 { 498 mbeanserver.invoke(registryObjectName, "removeServer", new Object []{detection.getIdentity()}, 499 new String []{Identity.class.getName()}); 500 log.debug("Removed detection " + detection); 501 } 502 } 503 catch(Exception ex) 504 { 505 log.warn("Error removing server", ex); 506 } 507 finally 508 { 509 servers.remove(detection); 511 } 512 } 513 514 return ok; 515 } 516 517 518 private final class FailureDetector extends TimerTask 519 { 520 public void run() 521 { 522 if(servers.isEmpty()) 523 { 524 return; 525 } 526 Map map = null; 529 synchronized(servers) 530 { 531 map = new HashMap (servers); 532 } 533 ClassLoader cl = AbstractDetector.this.getClass().getClassLoader(); 534 Iterator iter = map.keySet().iterator(); 536 while(iter.hasNext()) 537 { 538 Detection detection = (Detection) iter.next(); 539 long lastDetection = 0; 540 Server server = null; 541 synchronized(servers) 542 { 543 server = (Server ) map.get(detection); 544 lastDetection = server.lastDetection; 545 } 546 long duration = System.currentTimeMillis() - lastDetection; 547 if(duration >= defaultTimeDelay) 548 { 549 if(log.isTraceEnabled()) 550 { 551 log.trace("detection for: " + detection + " has not been received in: " + defaultTimeDelay + " ms, contacting.."); 552 } 553 if(checkInvokerServer(detection, cl)) 557 { 558 if(log.isTraceEnabled()) 559 { 560 log.trace("detection for: " + detection + " recovered on ping"); 561 } 562 server.lastDetection = System.currentTimeMillis(); 563 } 564 } 565 } 566 } 567 568 } 569 570 private final class Server 571 { 572 Detection detection; 573 private int hashCode = 0; 574 long lastDetection = System.currentTimeMillis(); 575 576 Server(Detection detection) 577 { 578 rehash(detection); 579 } 580 581 private void rehash(Detection d) 582 { 583 this.hashCode = hash(d); 584 } 585 586 private int hash(Detection d) 587 { 588 int hc = 0; 589 InvokerLocator locators[] = d.getLocators(); 590 if(locators != null) 591 { 592 for(int c = 0; c < locators.length; c++) 593 { 594 hc += locators[c].hashCode(); 595 } 596 } 597 return hc; 598 } 599 600 boolean changed(Detection detection) 601 { 602 return hashCode != hash(detection); 603 } 604 605 public boolean equals(Object obj) 606 { 607 return obj instanceof Server && hashCode == obj.hashCode(); 608 } 609 610 public int hashCode() 611 { 612 return hashCode; 613 } 614 } 615 616 private final class Heartbeat extends TimerTask 617 { 618 public void run() 619 { 620 InvokerLocator il[] = InvokerRegistry.getRegisteredServerLocators(); 621 if(il != null && il.length > 0) 622 { 623 heartbeat(); 626 } 627 } 628 } 629 } 630 | Popular Tags |