1 22 package org.jboss.ha.jndi; 23 24 import java.io.IOException ; 25 import java.io.ObjectOutputStream ; 26 import java.io.OutputStream ; 27 import java.lang.reflect.InvocationTargetException ; 28 import java.lang.reflect.Method ; 29 import java.lang.reflect.UndeclaredThrowableException ; 30 import java.net.DatagramPacket ; 31 import java.net.InetAddress ; 32 import java.net.MulticastSocket ; 33 import java.net.ServerSocket ; 34 import java.net.Socket ; 35 import java.net.UnknownHostException ; 36 import java.rmi.MarshalledObject ; 37 import java.util.Collections ; 38 import java.util.HashMap ; 39 import java.util.Iterator ; 40 import java.util.Map ; 41 import java.util.Set ; 42 43 import javax.management.ObjectInstance ; 44 import javax.management.ObjectName ; 45 import javax.management.Query ; 46 import javax.management.QueryExp ; 47 import javax.net.ServerSocketFactory; 48 49 import org.jboss.cache.Cache; 50 import org.jboss.ha.framework.interfaces.HAPartition; 51 import org.jboss.ha.framework.server.ClusterPartitionMBean; 52 import org.jboss.invocation.Invocation; 53 import org.jboss.invocation.MarshalledInvocation; 54 import org.jboss.logging.Logger; 55 import org.jboss.mx.util.MBeanProxyExt; 56 import org.jboss.system.ServiceMBeanSupport; 57 import org.jboss.system.server.ServerConfigUtil; 58 import org.jboss.util.threadpool.BasicThreadPool; 59 import org.jboss.util.threadpool.BasicThreadPoolMBean; 60 import org.jboss.util.threadpool.ThreadPool; 61 import org.jnp.interfaces.Naming; 62 import org.jnp.interfaces.NamingContext; 63 64 73 public class DetachedHANamingService 74 extends ServiceMBeanSupport 75 implements DetachedHANamingServiceMBean 76 { 77 79 83 protected ServerSocket bootstrapSocket; 84 85 88 protected HAJNDI theServer; 89 92 protected Map marshalledInvocationMapping; 93 96 protected Naming stub; 97 100 protected HAPartition partition; 101 104 protected ClusterPartitionMBean clusterPartition; 105 108 protected Cache cache; 109 112 protected String partitionName = ServerConfigUtil.getDefaultPartitionName(); 113 116 private ObjectName proxyFactory; 117 118 122 protected InetAddress bindAddress; 123 126 protected int backlog = 50; 127 131 protected int port = 1100; 132 133 136 protected String adGroupAddress = NamingContext.DEFAULT_DISCOVERY_GROUP_ADDRESS; 137 140 protected int adGroupPort = NamingContext.DEFAULT_DISCOVERY_GROUP_PORT; 141 144 protected InetAddress discoveryBindAddress; 145 146 protected AutomaticDiscovery autoDiscovery = null; 147 148 protected boolean discoveryDisabled = false; 149 150 protected int autoDiscoveryTTL = 16; 151 154 protected ServerSocketFactory jnpServerSocketFactory; 155 158 protected String jnpServerSocketFactoryName; 159 160 163 protected ThreadPool lookupPool; 164 165 167 public DetachedHANamingService() 168 { 169 } 171 172 177 public Map getMethodMap() 178 { 179 return marshalledInvocationMapping; 180 } 181 182 public String getPartitionName() 183 { 184 return partitionName; 185 } 186 187 public void setPartitionName(final String partitionName) 188 { 189 this.partitionName = partitionName; 190 } 191 192 public ClusterPartitionMBean getClusterPartition() 193 { 194 return clusterPartition; 195 } 196 197 public void setClusterPartition(ClusterPartitionMBean clusterPartition) 198 { 199 this.clusterPartition = clusterPartition; 200 } 201 202 public ObjectName getProxyFactoryObjectName() 203 { 204 return proxyFactory; 205 } 206 207 public void setProxyFactoryObjectName(ObjectName proxyFactory) 208 { 209 this.proxyFactory = proxyFactory; 210 } 211 212 public void setPort(int p) 213 { 214 port = p; 215 } 216 217 public int getPort() 218 { 219 return port; 220 } 221 222 public String getBindAddress() 223 { 224 String address = null; 225 if (bindAddress != null) 226 address = bindAddress.getHostAddress(); 227 return address; 228 } 229 230 public void setBindAddress(String host) throws java.net.UnknownHostException 231 { 232 bindAddress = InetAddress.getByName(host); 233 } 234 235 public int getBacklog() 236 { 237 return backlog; 238 } 239 240 public void setBacklog(int backlog) 241 { 242 if (backlog <= 0) 243 backlog = 50; 244 this.backlog = backlog; 245 } 246 247 public void setDiscoveryDisabled(boolean disable) 248 { 249 this.discoveryDisabled = disable; 250 } 251 252 public boolean getDiscoveryDisabled() 253 { 254 return this.discoveryDisabled; 255 } 256 257 public String getAutoDiscoveryAddress() 258 { 259 return this.adGroupAddress; 260 } 261 262 public void setAutoDiscoveryAddress(String adAddress) 263 { 264 this.adGroupAddress = adAddress; 265 } 266 267 public int getAutoDiscoveryGroup() 268 { 269 return this.adGroupPort; 270 } 271 public void setAutoDiscoveryGroup(int adGroup) 272 { 273 this.adGroupPort = adGroup; 274 } 275 276 public String getAutoDiscoveryBindAddress() 277 { 278 String address = null; 279 if (discoveryBindAddress != null) 280 address = discoveryBindAddress.getHostAddress(); 281 return address; 282 } 283 public void setAutoDiscoveryBindAddress(String address) 284 throws UnknownHostException 285 { 286 discoveryBindAddress = InetAddress.getByName(address); 287 } 288 289 public int getAutoDiscoveryTTL() 290 { 291 return autoDiscoveryTTL; 292 } 293 294 public void setAutoDiscoveryTTL(int ttl) 295 { 296 autoDiscoveryTTL = ttl; 297 } 298 299 public void setJNPServerSocketFactory(String factoryClassName) 300 throws ClassNotFoundException , InstantiationException , IllegalAccessException 301 { 302 this.jnpServerSocketFactoryName = factoryClassName; 303 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 304 Class clazz = loader.loadClass(jnpServerSocketFactoryName); 305 jnpServerSocketFactory = (ServerSocketFactory) clazz.newInstance(); 306 } 307 308 public void setLookupPool(BasicThreadPoolMBean poolMBean) 309 { 310 lookupPool = poolMBean.getInstance(); 311 } 312 313 public void startService(HAPartition haPartition) 314 throws Exception 315 { 316 this.partition = haPartition; 317 this.startService(); 318 } 319 320 protected void createService() 321 throws Exception 322 { 323 partition = clusterPartition.getHAPartition(); 324 partitionName = partition.getPartitionName(); 325 326 if (partition == null) 327 throw new IllegalStateException ("Cannot find partition '" + partitionName + "'"); 328 329 cache = clusterPartition.getClusteredCache(); 330 331 if (log.isDebugEnabled()) 332 log.debug("Initializing HAJNDI server on partition: " + partitionName); 333 334 theServer = new HAJNDI(partition, cache); 336 log.debug("initialize HAJNDI"); 337 339 HashMap tmpMap = new HashMap (13); 341 Method [] methods = Naming.class.getMethods(); 342 for (int m = 0; m < methods.length; m++) 343 { 344 Method method = methods[m]; 345 Long hash = new Long (MarshalledInvocation.calculateHash(method)); 346 tmpMap.put(hash, method); 347 } 348 marshalledInvocationMapping = Collections.unmodifiableMap(tmpMap); 349 350 NamingContext.setHANamingServerForPartition(partitionName, theServer); 352 } 353 354 protected void startService() 355 throws Exception 356 { 357 log.debug("Obtaining the transport proxy"); 358 stub = this.getNamingProxy(); 359 this.theServer.setHAStub(stub); 360 if (port >= 0) 361 { 362 log.debug("Starting HAJNDI bootstrap listener"); 363 initBootstrapListener(); 364 } 365 366 if (adGroupAddress != null && discoveryDisabled == false) 368 { 369 try 370 { 371 autoDiscovery = new AutomaticDiscovery(); 372 autoDiscovery.start(); 373 lookupPool.run(autoDiscovery); 374 } 375 catch (Exception e) 376 { 377 log.warn("Failed to start AutomaticDiscovery", e); 378 } 379 } 380 log.debug("initializing HAJNDI TreeCache"); 381 theServer.init(); 382 } 383 384 protected void stopService() throws Exception 385 { 386 NamingContext.removeHANamingServerForPartition(partitionName); 388 389 ServerSocket s = bootstrapSocket; 391 bootstrapSocket = null; 392 if (s != null) 393 { 394 log.debug("Closing the bootstrap listener"); 395 s.close(); 396 } 397 398 log.debug("Stopping the HAJNDI service"); 400 theServer.stop(); 401 402 log.debug("Stopping AutomaticDiscovery"); 403 if (autoDiscovery != null && discoveryDisabled == false) 404 autoDiscovery.stop(); 405 } 406 407 protected void destroyService() throws Exception 408 { 409 log.debug("Destroying the HAJNDI service"); 410 theServer.destroy(); 411 } 412 413 420 public Object invoke(Invocation invocation) throws Exception 421 { 422 if (invocation instanceof MarshalledInvocation) 424 { 425 MarshalledInvocation mi = (MarshalledInvocation) invocation; 426 mi.setMethodMap(marshalledInvocationMapping); 427 } 428 Method method = invocation.getMethod(); 430 Object [] args = invocation.getArguments(); 431 Object value = null; 432 try 433 { 434 value = method.invoke(theServer, args); 435 } 436 catch (InvocationTargetException e) 437 { 438 Throwable t = e.getTargetException(); 439 if (t instanceof Exception ) 440 throw (Exception ) t; 441 else 442 throw new UndeclaredThrowableException (t, method.toString()); 443 } 444 445 return value; 446 } 447 448 451 protected void initBootstrapListener() 452 { 453 try 455 { 456 if (jnpServerSocketFactory == null) 458 jnpServerSocketFactory = ServerSocketFactory.getDefault(); 459 bootstrapSocket = jnpServerSocketFactory.createServerSocket(port, backlog, bindAddress); 460 if (port == 0) 462 port = bootstrapSocket.getLocalPort(); 463 String msg = "Started ha-jndi bootstrap jnpPort=" + port 464 + ", backlog=" + backlog + ", bindAddress=" + bindAddress; 465 log.info(msg); 466 } 467 catch (IOException e) 468 { 469 log.error("Could not start on port " + port, e); 470 } 471 472 if (lookupPool == null) 473 lookupPool = new BasicThreadPool("HANamingBootstrap Pool"); 474 AcceptHandler handler = new AcceptHandler(); 475 lookupPool.run(handler); 476 } 477 478 480 protected HAPartition findHAPartitionWithName(String name) throws Exception 481 { 482 HAPartition result = null; 483 QueryExp matchName = Query.match(Query.attr("Name"), 487 Query.value("ClusterPartition")); 488 QueryExp matchPartitionName = Query.match(Query.attr("PartitionName"), 489 Query.value(name)); 490 QueryExp exp = Query.and(matchName, matchPartitionName); 491 Set mbeans = this.getServer().queryMBeans(null, exp); 492 if (mbeans != null && mbeans.size() > 0) 493 { 494 for (Iterator iter = mbeans.iterator(); iter.hasNext();) 495 { 496 ObjectInstance inst = (ObjectInstance ) iter.next(); 497 try 498 { 499 ClusterPartitionMBean cp = 500 (ClusterPartitionMBean) MBeanProxyExt.create( 501 ClusterPartitionMBean.class, 502 inst.getObjectName(), 503 this.getServer()); 504 result = cp.getHAPartition(); 505 break; 506 } 507 catch (Exception e) {} 508 } 509 } 510 511 512 return result; 513 } 514 515 521 protected Naming getNamingProxy() throws Exception 522 { 523 Naming proxy = (Naming) server.getAttribute(proxyFactory, "Proxy"); 524 return proxy; 525 } 526 527 529 private class AutomaticDiscovery 530 implements Runnable 531 { 532 protected Logger log = Logger.getLogger(AutomaticDiscovery.class); 533 534 protected MulticastSocket socket = null; 535 536 protected byte[] ipAddress = null; 537 538 protected InetAddress group = null; 539 protected boolean stopping = false; 540 protected Thread receiverThread = null; 542 protected boolean receiverStopped = true; 543 544 public AutomaticDiscovery() throws Exception 545 { 546 } 547 548 public void start() throws Exception 549 { 550 stopping = false; 551 if (discoveryBindAddress == null) 553 discoveryBindAddress = bindAddress; 554 socket = new MulticastSocket (adGroupPort); 555 if (discoveryBindAddress != null && discoveryBindAddress.isAnyLocalAddress() == false) 557 { 558 socket.setInterface(discoveryBindAddress); 559 } 560 socket.setTimeToLive(autoDiscoveryTTL); 561 group = InetAddress.getByName(adGroupAddress); 562 socket.joinGroup(group); 563 564 String address = getBindAddress(); 565 569 if (address == null || address.equals("0.0.0.0")) 570 { 571 address = InetAddress.getLocalHost().getHostAddress(); 572 } 573 ipAddress = (address + ":" + port).getBytes(); 574 575 log.info("Listening on " + socket.getInterface() + ":" + socket.getLocalPort() 576 + ", group=" + adGroupAddress 577 + ", HA-JNDI address=" + new String (ipAddress)); 578 } 579 580 public void stop() 581 { 582 try 583 { 584 stopping = true; 585 586 if (receiverThread != null 588 && receiverThread != Thread.currentThread() 589 && receiverThread.isInterrupted() == false) 590 { 591 receiverThread.join(5); 593 if (!receiverStopped) 594 receiverThread.interrupt(); } 596 597 socket.leaveGroup(group); 598 socket.close(); 599 } 600 catch (Exception ex) 601 { 602 log.error("Stopping AutomaticDiscovery failed", ex); 603 } 604 } 605 606 public void run() 607 { 608 boolean trace = log.isTraceEnabled(); 609 log.debug("Discovery request thread begin"); 610 611 receiverThread = Thread.currentThread(); 614 615 receiverStopped = false; 616 617 while (true) 619 { 620 if (stopping) 622 break; 623 try 624 { 625 if (trace) 626 log.trace("HA-JNDI AutomaticDiscovery waiting for queries..."); 627 byte[] buf = new byte[256]; 628 DatagramPacket packet = new DatagramPacket (buf, buf.length); 629 socket.receive(packet); 630 if (trace) 631 log.trace("HA-JNDI AutomaticDiscovery Packet received."); 632 633 DiscoveryRequestHandler handler = new DiscoveryRequestHandler(log, 635 packet, socket, ipAddress); 636 lookupPool.run(handler); 637 if (trace) 638 log.trace("Queued DiscoveryRequestHandler"); 639 } 640 catch (Throwable t) 641 { 642 if (stopping == false) 643 log.warn("Ignored error while processing HAJNDI discovery request:", t); 644 } 645 } 646 receiverStopped = true; 647 log.debug("Discovery request thread end"); 648 } 649 } 650 651 654 private class DiscoveryRequestHandler implements Runnable 655 { 656 private Logger log; 657 private MulticastSocket socket; 658 private DatagramPacket packet; 659 private byte[] ipAddress; 660 661 DiscoveryRequestHandler(Logger log, DatagramPacket packet, 662 MulticastSocket socket, byte[] ipAddress) 663 { 664 this.log = log; 665 this.packet = packet; 666 this.socket = socket; 667 this.ipAddress = ipAddress; 668 } 669 public void run() 670 { 671 boolean trace = log.isTraceEnabled(); 672 if( trace ) 673 log.trace("DiscoveryRequestHandler begin"); 674 try 676 { 677 String requestData = new String (packet.getData()).trim(); 679 if( trace ) 680 log.trace("RequestData: "+requestData); 681 int colon = requestData.indexOf(':'); 682 if (colon > 0) 683 { 684 String name = requestData.substring(colon + 1); 686 if (name.equals(partitionName) == false) 687 { 688 log.debug("Ignoring discovery request for partition: " + name); 689 if( trace ) 690 log.trace("DiscoveryRequestHandler end"); 691 return; 692 } 693 } 694 DatagramPacket p = new DatagramPacket (ipAddress, ipAddress.length, 695 packet.getAddress(), packet.getPort()); 696 if (trace) 697 log.trace("Sending AutomaticDiscovery answer: " + new String (ipAddress)); 698 socket.send(p); 699 if (trace) 700 log.trace("AutomaticDiscovery answer sent."); 701 } 702 catch (IOException ex) 703 { 704 log.error("Error writing response", ex); 705 } 706 if( trace ) 707 log.trace("DiscoveryRequestHandler end"); 708 } 709 } 710 711 714 private class AcceptHandler implements Runnable 715 { 716 public void run() 717 { 718 boolean trace = log.isTraceEnabled(); 719 while (bootstrapSocket != null) 720 { 721 Socket socket = null; 722 try 724 { 725 socket = bootstrapSocket.accept(); 726 if( trace ) 727 log.trace("Accepted bootstrap client: "+socket); 728 BootstrapRequestHandler handler = new BootstrapRequestHandler(socket); 729 lookupPool.run(handler); 730 } 731 catch (IOException e) 732 { 733 if (bootstrapSocket == null) 735 return; 736 log.error("Naming accept handler stopping", e); 737 } 738 catch(Throwable e) 739 { 740 log.error("Unexpected exception during accept", e); 741 } 742 } 743 } 744 } 745 746 749 private class BootstrapRequestHandler implements Runnable 750 { 751 private Socket socket; 752 BootstrapRequestHandler(Socket socket) 753 { 754 this.socket = socket; 755 } 756 public void run() 757 { 758 try 760 { 761 OutputStream os = socket.getOutputStream(); 762 ObjectOutputStream out = new ObjectOutputStream (os); 763 MarshalledObject replyStub = new MarshalledObject (stub); 764 out.writeObject(replyStub); 765 out.close(); 766 } 767 catch (IOException ex) 768 { 769 log.debug("Error writing response to " + socket, ex); 770 } 771 finally 772 { 773 try 774 { 775 socket.close(); 776 } 777 catch (IOException e) 778 { 779 } 780 } 781 } 782 } 783 } 784 | Popular Tags |