1 52 53 package com.go.teaservlet.util.cluster; 54 55 import java.io.*; 56 import java.net.*; 57 import java.rmi.Naming ; 58 import java.rmi.NoSuchObjectException ; 59 import java.rmi.RemoteException ; 60 import java.rmi.server.RemoteServer ; 61 import java.rmi.registry.LocateRegistry ; 62 import java.rmi.registry.Registry ; 63 import java.rmi.AlreadyBoundException ; 64 import java.util.StringTokenizer ; 65 import java.util.Iterator ; 66 import java.util.Collection ; 67 import java.util.Collections ; 68 import java.util.ArrayList ; 69 import java.util.Vector ; 70 import com.go.trove.util.PropertyMap; 71 import com.go.trove.log.Syslog; 72 73 74 80 public class ClusterManager { 81 82 102 103 private static int cActiveRegistryPort = -1; 104 105 public static ClusterManager createClusterManager(PropertyMap properties, 106 Clustered clusterObj) 107 throws Exception { 108 109 ClusterManager manager = null; 110 111 if (properties != null) { 112 properties = properties.subMap("cluster"); 113 114 if (properties.keySet().size() > 0) { 115 String servers = properties 116 .getString("servers"); 117 String clusterName = properties 118 .getString("name"); 119 InetAddress multicastGroup = null; 120 String netInterface = properties 121 .getString("localNet"); 122 123 int rmiPort = properties.getInt("rmi.port", 1099); 124 125 int multicastPort = properties.getInt("multicast.port", 1099); 126 String group = properties.getString("multicast.group"); 127 if (group != null) { 128 try { 129 multicastGroup = InetAddress.getByName(group); 130 } 131 catch (UnknownHostException uhe) {} 132 } 133 134 if (multicastGroup != null) { 135 manager = new ClusterManager(clusterObj, 136 multicastGroup, 137 multicastPort, 138 rmiPort, 139 netInterface, 140 servers); 141 } 142 else if (servers != null) { 143 manager = new ClusterManager(clusterObj, 144 rmiPort, 145 servers); 146 } 147 } 148 } 149 return manager; 150 } 151 152 private boolean DEBUG = true; 153 private Vector mUnresolvedServerNames; 154 private final Vector mExplicitlySpecifiedServerNames; 155 private MulticastSocket mSock; 156 private InetAddress mMultiGroup; 157 private String mHostName; 158 private Clustered mCluster; 159 private int mMultiPort, mRmiPort; 160 private Registry mLocalRegistry; 161 private AutomaticClusterManagementThread mAuto; 162 163 167 public ClusterManager(Restartable restartableObj,String clusterName, 168 String serverName,InetAddress multicastGroup, 169 int multicastPort,int rmiPort,String netInterface, 170 String serverNames) 171 throws IOException,RemoteException { 172 173 this(new TeaServletClusterHook(restartableObj,clusterName,serverName), 174 multicastGroup,multicastPort,rmiPort,netInterface,serverNames); 175 } 176 177 181 public ClusterManager(Clustered cluster,InetAddress multicastGroup, 182 int multicastPort,int rmiPort) throws IOException { 183 this(cluster,multicastGroup, 184 multicastPort,rmiPort,null,null); 185 } 186 187 188 192 public ClusterManager(Clustered cluster,InetAddress multicastGroup, 193 int multicastPort,int rmiPort, 194 String netInterface,String serverNames) 195 throws IOException { 196 197 this(cluster,rmiPort,serverNames); 198 mMultiGroup = multicastGroup; 199 mMultiPort = multicastPort; 200 setUpMulticast(multicastGroup,multicastPort, 201 cluster.getClusterName(), 202 cluster.getServerName(),netInterface); 203 } 204 205 209 public ClusterManager(Restartable restartableObj,String clusterName, 210 String serverName,int rmiPort,String serverNames) 211 throws IOException { 212 213 this(new TeaServletClusterHook(restartableObj,clusterName,serverName), 214 rmiPort,serverNames); 215 } 216 217 221 public ClusterManager(Clustered cluster,int rmiPort,String serverNames) 222 throws IOException { 223 224 if (rmiPort > 0) { 225 mRmiPort = rmiPort; 226 } 227 else { 228 mRmiPort = 1099; 229 } 230 231 mCluster = cluster; 232 mLocalRegistry = prepareRegistry(cluster,rmiPort); 233 mExplicitlySpecifiedServerNames = new Vector (); 234 if (serverNames != null) { 235 StringTokenizer st = new StringTokenizer (serverNames,",; "); 236 while (st.hasMoreTokens()) { 237 mExplicitlySpecifiedServerNames 238 .add(st.nextToken().toLowerCase()); 239 } 240 } 241 mUnresolvedServerNames = (Vector )mExplicitlySpecifiedServerNames.clone(); 242 } 243 244 public String [] resolveServerNames() { 245 246 247 synchronized (mUnresolvedServerNames) { 248 249 Iterator resolveIt = mUnresolvedServerNames.iterator(); 250 while (resolveIt.hasNext()) { 251 String nextHost = (String )resolveIt.next(); 252 try { 253 String namingURL = 254 ("//" + nextHost + ':' + mRmiPort 255 + '/' + mCluster.getClusterName()); 256 257 if (DEBUG) { 258 Syslog.debug("Looking Up " + namingURL); 259 } 260 261 Clustered bcl = 262 (Clustered)Naming.lookup(namingURL); 263 264 if (bcl != null) { 265 if (!mCluster.containsPeer(bcl)) { 266 resolveIt.remove(); 267 mCluster.addPeer(bcl); 268 Syslog.debug("Successfullly resolved " 269 + bcl.getServerName() + 270 " as a member of the " 271 + mCluster.getClusterName() 272 + " cluster."); 273 } 274 } 275 continue; 276 } 277 catch (Exception e) { 278 Syslog.debug(e); 279 } 280 Syslog.debug("Failed to resolve " 281 + nextHost + " as part of this cluster"); 282 } 283 } 284 285 286 try { 287 Clustered[] peers = getCluster().getKnownPeers(); 288 ArrayList peerNames = new ArrayList (); 289 boolean lostOne = false; 290 for (int j = 0;j<peers.length;j++) { 291 try { 292 peerNames.add(peers[j].getServerName()); 293 } 294 catch (RemoteException re) { 295 getCluster().removePeer(peers[j]); 296 lostOne = true; 297 } 298 } 299 if (lostOne) { 300 mUnresolvedServerNames = 301 (Vector )mExplicitlySpecifiedServerNames.clone(); 302 mUnresolvedServerNames.removeAll(peerNames); 303 } 304 return (String [])peerNames.toArray(new String [peerNames.size()]); 305 } 306 catch (RemoteException re2) { 307 Syslog.debug(re2); 308 } 309 310 return null; 311 } 312 313 314 public Clustered getCluster() { 315 return mCluster; 316 } 317 318 public int getRMIPort() { 319 return mRmiPort; 320 } 321 322 public int getMulticastPort() { 323 return mMultiPort; 324 } 325 326 public InetAddress getMulticastGroup() { 327 return mMultiGroup; 328 } 329 330 public void send(byte[] msg) throws IOException { 331 332 try { 333 mSock.send(new DatagramPacket(msg, 334 msg.length, 335 mMultiGroup, 336 mMultiPort)); 337 } 338 catch (SecurityException se) { 339 se.printStackTrace(); 340 } 341 } 342 343 public DatagramPacket getNextPacket() throws IOException { 344 345 DatagramPacket pack = new DatagramPacket(new byte[1024],1024); 346 try { 347 mSock.receive(pack); 348 } 349 catch (SecurityException se) { 350 se.printStackTrace(); 351 } 352 return pack; 353 } 354 355 358 public void launchAuto() { 359 launchAuto(true); 360 } 361 362 367 public void launchAuto(boolean active) { 368 killAuto(); 369 if (mSock != null) { 370 try { 371 mAuto = new AutomaticClusterManagementThread(this, mCluster 372 .getClusterName(), 373 active); 374 } 375 catch (Exception e) { 376 mAuto = new AutomaticClusterManagementThread(this, active); 377 } 378 if (mAuto != null) { 379 mAuto.start(); 380 } 381 } 382 } 383 384 388 public void launchAuto(AutomaticClusterManagementThread auto) { 389 killAuto(); 390 if (mSock != null) { 391 mAuto = auto; 392 if (mAuto != null) { 393 mAuto.start(); 394 } 395 } 396 } 397 398 public void killAuto() { 399 if (mAuto != null) { 400 mAuto.kill(); 401 } 402 mAuto = null; 403 } 404 405 public void joinCluster() throws IOException { 406 407 send(("join~" + getCluster().getClusterName() 408 + '~' + getHostName()).getBytes()); 409 } 410 411 public void pingCluster() throws IOException { 412 413 send(("ping~" + getCluster().getClusterName() 414 + '~' + getHostName()).getBytes()); 415 } 416 417 public String getHostName() throws IOException { 418 419 if (mHostName == null) { 420 mHostName = InetAddress.getLocalHost().getHostName(); 421 } 422 return mHostName; 423 } 424 425 public void destroy() throws IOException { 426 killAuto(); 427 if (mSock != null) { 428 send(("leave~" + mCluster.getClusterName() 429 + '~' + getHostName()).getBytes()); 430 mSock.leaveGroup(mMultiGroup); 431 mSock.close(); 432 mSock = null; 433 } 434 } 435 436 protected Registry prepareRegistry(Clustered cluster,int port) 437 throws IOException { 438 439 Registry reg = null; 440 441 if (cActiveRegistryPort < 0) { 442 try { 443 reg = LocateRegistry.createRegistry(port); 444 cActiveRegistryPort = port; 445 } 446 catch (Exception e) { 447 reg = null; 448 cActiveRegistryPort = -1; 449 e.printStackTrace(); 450 } 451 } 452 else { 453 if (cActiveRegistryPort != port) { 454 Syslog.warn("An active RMI registry already exists on " 455 + cActiveRegistryPort 456 + " this port will be used in lieu of port " 457 + port + "."); 458 } 459 460 port = cActiveRegistryPort; 461 try { 462 reg = LocateRegistry.getRegistry(port); 463 reg.list(); 464 } 465 catch (Exception e) { 466 reg = null; 467 } 468 } 469 470 if (reg != null) { 471 try { 472 reg.bind(cluster.getClusterName(),cluster); 473 } 474 catch (AlreadyBoundException abe) { 475 reg.rebind(cluster.getClusterName(),cluster); 476 } 477 catch (NoSuchObjectException nsoe) { 478 Syslog.warn(nsoe); 479 if (nsoe.detail != null) { 480 Syslog.warn(nsoe.detail); 481 } 482 else { 483 Syslog.warn("No detail available"); 484 } 485 } 486 487 Syslog.info(cluster.getClusterName() + " bound on " + 488 cluster.getServerName() + ':' + port); 489 } 490 else { 491 throw new IOException("Failed to connect to a valid RMI registry"); 492 } 493 return reg; 494 } 495 496 protected void setUpMulticast(InetAddress group,int port, 497 String clusterName) throws IOException { 498 setUpMulticast(group,port,clusterName,null,null); 499 } 500 501 protected void setUpMulticast(InetAddress group,int port, 502 String clusterName,String host) 503 throws IOException { 504 505 setUpMulticast(group,port,clusterName,host,null); 506 } 507 508 509 510 protected void setUpMulticast(InetAddress group,int port, 511 String clusterName,String host,String netInterface) 512 throws IOException { 513 514 if (DEBUG) { Syslog.debug("Setting up Multicast");} 515 516 if (mSock == null) { 517 mSock = new MulticastSocket(port); 518 519 if (host == null) { 520 host = getHostName(); 521 } 522 else { 523 mHostName = host; 524 } 525 526 InetAddress interf = mSock.getInterface(); 528 529 InetAddress[] addresses = InetAddress.getAllByName(host); 530 531 if (DEBUG) { 532 Syslog.debug("addresses on this host."); 533 for (int j = 0;j<addresses.length;j++) { 534 Syslog.debug(addresses[j].getHostAddress()); 535 } 536 537 Syslog.debug("current interface IP: " 538 + interf.getHostAddress()); 539 } 540 541 if (netInterface != null) { 542 try { 543 byte[] mask = {(byte)255,(byte)255,(byte)255,(byte)0}; 544 int slashindex = -1; 545 if ((slashindex = netInterface.indexOf('/')) >= 0) { 546 int maskID = Integer 547 .parseInt(netInterface.substring(slashindex+1)); 548 netInterface = netInterface.substring(0,slashindex); 549 slashindex = (0x80000000 >> maskID-1); 550 mask[3] = (byte)(slashindex & 0xFF); 551 mask[2] = (byte)((slashindex >> 8) & 0xFF); 552 mask[1] = (byte)((slashindex >> 16) & 0xFF); 553 mask[0] = (byte)((slashindex >> 24) & 0xFF); 554 } 555 556 StringTokenizer st = new StringTokenizer (netInterface," ."); 557 if (st.countTokens() == 4) { 558 byte[] maskedNet = new byte[4]; 559 for(int k=0; k<4;k++) { 560 String token = st.nextToken(); 561 maskedNet[k] = (byte)(Integer.parseInt(token) 562 & mask[k]); 563 } 564 if (DEBUG) { 565 Syslog.debug("net: " 566 + maskedNet[0] 567 + "." + maskedNet[1] 568 + "." + maskedNet[2] 569 + "." + maskedNet[3]); 570 } 571 572 for (int j=0;j<addresses.length;j++) { 574 byte[] testAddress = addresses[j].getAddress(); 575 Syslog.debug("testing: " 576 + addresses[j].getHostAddress()); 577 if (maskedNet[0] == (testAddress[0] & mask[0]) 578 && maskedNet[1] == (testAddress[1] & mask[1]) 579 && maskedNet[2] == (testAddress[2] & mask[2]) 580 && maskedNet[3] == (testAddress[3] & mask[3])) { 581 582 mSock.setInterface(addresses[j]); 583 host = addresses[j].getHostAddress(); 585 mHostName = host; 586 587 Syslog.debug("new interface IP: " 588 + addresses[j] 589 .getHostAddress()); 590 591 break; 592 } 593 } 594 } 595 } 596 catch (Exception e) { 597 e.printStackTrace(); 598 } 599 } 600 601 mSock.setSendBufferSize(1024); 602 mSock.setReceiveBufferSize(1024); 603 mSock.setTimeToLive(1); 604 mSock.joinGroup(group); 605 } 606 } 607 608 612 public long convertIPBytes(byte[] ipBytes) { 613 if (ipBytes.length == 4) { 614 long ipLong = (((long)ipBytes[0]) << 24); 615 ipLong |= (((long)ipBytes[1]) << 16); 616 ipLong |= (((long)ipBytes[2]) << 8); 617 ipLong |= (long)ipBytes[3]; 618 return ipLong; 619 } 620 return -1; 621 } 622 623 626 public long convertIPString(String ip) throws NumberFormatException , 627 IllegalArgumentException { 628 629 StringTokenizer st = new StringTokenizer (ip,"."); 630 if (st.countTokens() == 4) { 631 long ipLong = (Long.parseLong(st.nextToken()) << 24); 632 ipLong += (Long.parseLong(st.nextToken()) << 16); 633 ipLong += (Long.parseLong(st.nextToken()) << 8); 634 ipLong += Long.parseLong(st.nextToken()); 635 return ipLong; 636 } 637 else { 638 throw new IllegalArgumentException ("Invalid IP string"); 639 } 640 } 641 642 645 public String convertIPBackToString(long ip) { 646 StringBuffer sb = new StringBuffer (16); 647 sb.append(Long.toString((ip >> 24)& 0xFF)); 648 sb.append('.'); 649 sb.append(Long.toString((ip >> 16)& 0xFF)); 650 sb.append('.'); 651 sb.append(Long.toString((ip >> 8) & 0xFF)); 652 sb.append('.'); 653 sb.append(Long.toString(ip & 0xFF)); 654 return sb.toString(); 655 } 656 } 657 658 | Popular Tags |