1 31 package org.objectweb.proactive.core.descriptor.data; 32 33 import org.objectweb.proactive.ProActive; 34 import org.objectweb.proactive.core.ProActiveException; 35 import org.objectweb.proactive.core.event.RuntimeRegistrationEvent; 36 import org.objectweb.proactive.core.event.RuntimeRegistrationEventListener; 37 import org.objectweb.proactive.core.node.Node; 38 import org.objectweb.proactive.core.node.NodeException; 39 import org.objectweb.proactive.core.node.NodeFactory; 40 import org.objectweb.proactive.core.node.NodeImpl; 41 import org.objectweb.proactive.core.process.ExternalProcess; 42 import org.objectweb.proactive.core.process.ExternalProcessDecorator; 43 import org.objectweb.proactive.core.process.JVMProcess; 44 import org.objectweb.proactive.core.process.globus.GlobusProcess; 45 import org.objectweb.proactive.core.process.lsf.LSFBSubProcess; 46 import org.objectweb.proactive.core.process.prun.PrunSubProcess; 47 import org.objectweb.proactive.core.runtime.ProActiveRuntime; 48 import org.objectweb.proactive.core.runtime.ProActiveRuntimeImpl; 49 import org.objectweb.proactive.core.runtime.RuntimeFactory; 50 import org.objectweb.proactive.core.util.UrlBuilder; 51 import org.objectweb.proactive.ext.security.PolicyServer; 52 53 import java.io.Serializable ; 54 55 import java.security.cert.X509Certificate ; 56 57 import java.util.Hashtable ; 58 59 60 70 public class VirtualNodeImpl extends RuntimeDeploymentProperties 71 implements VirtualNode, Serializable , RuntimeRegistrationEventListener { 72 76 77 protected transient ProActiveRuntimeImpl proActiveRuntimeImpl; 78 79 80 private String name; 81 82 83 private String property; 84 85 86 private java.util.ArrayList virtualMachines; 87 88 89 private int lastVirtualMachineIndex; 90 91 92 private java.util.ArrayList createdNodes; 93 94 95 private int lastNodeIndex; 96 97 98 private int nodeCount; 99 100 101 private int nodeCountCreated; 102 103 104 private boolean nodeCreated = false; 105 private boolean isActivated = false; 106 107 109 private Hashtable awaitedVirtualNodes; 110 private String registrationProtocol; 111 private boolean registration = false; 112 private boolean waitForTimeout = false; 113 protected int MAX_RETRY = 70; 114 private Object uniqueActiveObject = null; 115 private X509Certificate creatorCertificate; 116 private PolicyServer policyServer; 117 private String policyServerFile; 118 private String jobID = ProActive.getJobId(); 119 120 124 127 VirtualNodeImpl() { 128 } 129 130 133 VirtualNodeImpl(String name, X509Certificate creatorCertificate, 134 PolicyServer policyServer) { 135 this.name = name; 136 virtualMachines = new java.util.ArrayList (5); 137 createdNodes = new java.util.ArrayList (); 138 awaitedVirtualNodes = new Hashtable (); 139 proActiveRuntimeImpl = (ProActiveRuntimeImpl) ProActiveRuntimeImpl.getProActiveRuntime(); 140 if (logger.isDebugEnabled()) { 141 logger.debug("vn " + this.name + " registered on " + 142 proActiveRuntimeImpl.getVMInformation().getVMID().toString()); 143 } 144 proActiveRuntimeImpl.addRuntimeRegistrationEventListener(this); 145 proActiveRuntimeImpl.registerLocalVirtualNode(this, this.name); 146 this.creatorCertificate = creatorCertificate; 148 this.policyServer = policyServer; 149 } 150 151 public void setProperty(String value) { 155 this.property = value; 156 } 157 158 public String getProperty() { 159 return property; 160 } 161 162 public void setTimeout(String timeout, boolean waitForTimeout) { 163 MAX_RETRY = new Integer (timeout).intValue(); 164 this.waitForTimeout = waitForTimeout; 165 } 166 167 public void setName(String s) { 168 this.name = s; 169 } 170 171 public String getName() { 172 return name; 173 } 174 175 public void addVirtualMachine(VirtualMachine virtualMachine) { 176 virtualMachines.add(virtualMachine); 177 if (!((virtualMachine.getCreatorId()).equals(this.name))) { 178 awaitedVirtualNodes.put(virtualMachine.getCreatorId(), 180 virtualMachine); 181 } 184 if (logger.isDebugEnabled()) { 185 logger.debug("mapped VirtualNode=" + name + 186 " with VirtualMachine=" + virtualMachine.getName()); 187 } 188 } 189 190 public VirtualMachine getVirtualMachine() { 191 if (virtualMachines.isEmpty()) { 192 return null; 193 } 194 VirtualMachine vm = (VirtualMachine) virtualMachines.get(lastVirtualMachineIndex); 195 return vm; 196 } 197 198 201 public void activate() { 202 if (!isActivated) { 203 for (int i = 0; i < virtualMachines.size(); i++) { 204 VirtualMachine vm = getVirtualMachine(); 205 boolean vmAlreadyAssigned = !((vm.getCreatorId()).equals(this.name)); 206 ExternalProcess process = getProcess(vm, vmAlreadyAssigned); 207 208 if (!vmAlreadyAssigned) { 213 setParameters(process, vm); 214 process.setSecurityFile(policyServerFile); 215 try { 217 proActiveRuntimeImpl.createVM(process); 218 } catch (java.io.IOException e) { 219 e.printStackTrace(); 220 logger.error("cannot activate virtualNode " + 221 this.name + " with the process " + 222 process.getCommand()); 223 } 224 } 225 226 increaseIndex(); 231 } 232 isActivated = true; 233 if (registration) { 234 register(); 235 } 236 } else { 237 logger.info("VirtualNode " + this.name + " already activated !!!"); 238 } 239 } 240 241 245 public int getNodeCount() { 246 return nodeCount; 247 } 248 249 253 public int createdNodeCount() { 254 return nodeCountCreated; 255 } 256 257 262 public Node getNode() throws NodeException { 263 Node node; 265 waitForNodeCreation(); 266 if (!createdNodes.isEmpty()) { 267 node = (Node) createdNodes.get(lastNodeIndex); 268 increaseNodeIndex(); 269 return node; 270 } else { 271 throw new NodeException("Cannot get the node " + this.name); 272 } 273 } 274 275 public Node getNode(int index) throws NodeException { 276 Node node = (Node) createdNodes.get(index); 277 if (node == null) { 278 throw new NodeException( 279 "Cannot return the first node, no nodes hava been created"); 280 } 281 return node; 282 } 283 284 public String [] getNodesURL() throws NodeException { 285 String [] nodeNames; 286 try { 287 waitForAllNodesCreation(); 288 } catch (NodeException e) { 289 logger.error(e.getMessage()); 290 } 291 if (!createdNodes.isEmpty()) { 292 synchronized (createdNodes) { 293 nodeNames = new String [createdNodes.size()]; 294 for (int i = 0; i < createdNodes.size(); i++) { 295 nodeNames[i] = ((Node) createdNodes.get(i)).getNodeInformation() 296 .getURL(); 297 } 298 } 299 } else { 300 throw new NodeException( 301 "Cannot return nodes, no nodes hava been created"); 302 } 303 return nodeNames; 304 } 305 306 public Node[] getNodes() throws NodeException { 307 Node[] nodeTab; 308 try { 309 waitForAllNodesCreation(); 310 } catch (NodeException e) { 311 logger.error(e.getMessage()); 312 } 313 if (!createdNodes.isEmpty()) { 314 synchronized (createdNodes) { 315 nodeTab = new Node[createdNodes.size()]; 316 for (int i = 0; i < createdNodes.size(); i++) { 317 nodeTab[i] = ((Node) createdNodes.get(i)); 318 } 319 } 320 } else { 321 throw new NodeException( 322 "Cannot return nodes, no nodes hava been created"); 323 } 324 return nodeTab; 325 } 326 327 public Node getNode(String url) throws NodeException { 328 Node node = null; 329 try { 330 waitForAllNodesCreation(); 331 } catch (NodeException e) { 332 logger.error(e.getMessage()); 333 } 334 if (!createdNodes.isEmpty()) { 335 synchronized (createdNodes) { 336 for (int i = 0; i < createdNodes.size(); i++) { 337 if (((Node) createdNodes.get(i)).getNodeInformation() 338 .getURL().equals(url)) { 339 node = (Node) createdNodes.get(i); 340 break; 341 } 342 } 343 return node; 344 } 345 } else { 346 throw new NodeException( 347 "Cannot return nodes, no nodes hava been created"); 348 } 349 } 350 351 public void killAll(boolean softly) { 352 Node node; 353 ProActiveRuntime part = null; 354 if (isActivated) { 355 for (int i = 0; i < createdNodes.size(); i++) { 356 node = (Node) createdNodes.get(i); 357 part = node.getProActiveRuntime(); 358 359 if (!NodeFactory.isNodeLocal(node)) { 363 try { 364 part.killRT(softly); 365 } catch (ProActiveException e1) { 366 e1.printStackTrace(); 367 } catch (Exception e) { 368 logger.info(" Virutal Machine " + 369 part.getVMInformation().getVMID() + " on host " + 370 part.getVMInformation().getInetAddress() 371 .getCanonicalHostName() + " terminated!!!"); 372 } 373 } else { 374 try { 375 part.killNode(node.getNodeInformation().getURL()); 377 } catch (ProActiveException e) { 378 e.printStackTrace(); 379 } 380 } 381 } 382 isActivated = false; 383 try { 384 if (registration) { 386 ProActive.unregisterVirtualNode(this); 387 } 388 else { 390 proActiveRuntimeImpl.unregisterVirtualNode(this.name); 391 } 392 } catch (ProActiveException e) { 393 e.printStackTrace(); 394 } 395 396 } else { 398 proActiveRuntimeImpl.unregisterVirtualNode(this.name); 399 } 400 } 401 402 public void createNodeOnCurrentJvm(String protocol) { 403 try { 404 String url; 408 increaseNodeCount(1); 409 String nodeName = this.name + 410 Integer.toString(new java.util.Random ( 411 System.currentTimeMillis()).nextInt()); 412 413 ProActiveRuntime defaultRuntime = RuntimeFactory.getProtocolSpecificRuntime(checkProtocol( 415 protocol)); 416 417 url = defaultRuntime.createLocalNode(nodeName, false, policyServer, 419 this.getName(), ProActive.getJobId()); 420 performOperations(defaultRuntime, url, protocol); 422 } catch (Exception e) { 423 e.printStackTrace(); 424 } 425 } 426 427 public Object getUniqueAO() throws ProActiveException { 428 if (!property.equals("unique_singleAO")) { 429 logger.warn( 430 "!!!!!!!!!!WARNING. This VirtualNode is not defined with unique_single_AO property in the XML descriptor. Calling getUniqueAO() on this VirtualNode can lead to unexpected behaviour"); 431 } 432 433 if (uniqueActiveObject == null) { 434 try { 435 Node node = getNode(); 436 437 if (node.getActiveObjects().length > 1) { 438 logger.warn( 439 "!!!!!!!!!!WARNING. More than one active object is created on this VirtualNode."); 440 } 441 442 uniqueActiveObject = node.getActiveObjects()[0]; 443 } catch (Exception e) { 444 e.printStackTrace(); 445 } 446 } 447 if (uniqueActiveObject == null) { 448 throw new ProActiveException( 449 "No active object are registered on this VirtualNode"); 450 } 451 452 return uniqueActiveObject; 453 } 454 455 public boolean isActivated() { 456 return isActivated; 457 } 458 459 public String getJobID() { 463 return this.jobID; 464 } 465 466 public synchronized void runtimeRegistered(RuntimeRegistrationEvent event) { 470 String nodeName; 471 String [] nodeNames = null; 472 ProActiveRuntime proActiveRuntimeRegistered; 473 String nodeHost; 474 String protocol; 475 String url; 476 int port = 0; 477 VirtualMachine virtualMachine = null; 478 479 for (int i = 0; i < virtualMachines.size(); i++) { 480 if (((VirtualMachine) virtualMachines.get(i)).getName().equals(event.getVmName())) { 481 virtualMachine = (VirtualMachine) virtualMachines.get(i); 482 } 483 } 484 485 if ((event.getCreatorID().equals(this.name)) && 487 (virtualMachine != null)) { 488 if (logger.isDebugEnabled()) { 489 logger.debug("runtime " + event.getCreatorID() + 490 " registered on virtualnode " + this.name); 491 } 492 protocol = event.getProtocol(); 493 proActiveRuntimeRegistered = proActiveRuntimeImpl.getProActiveRuntime(event.getRegisteredRuntimeName()); 495 496 nodeHost = proActiveRuntimeRegistered.getVMInformation() 498 .getInetAddress() 499 .getCanonicalHostName(); 500 501 try { 502 port = UrlBuilder.getPortFromUrl(proActiveRuntimeRegistered.getURL()); 503 } catch (ProActiveException e) { 504 logger.warn("port unknown: " + port); 505 } 506 507 try { 508 int nodeNumber = (new Integer ((String ) virtualMachine.getNodeNumber())).intValue(); 511 for (int i = 1; i <= nodeNumber; i++) { 512 nodeName = this.name + 513 Integer.toString(new java.util.Random ( 514 System.currentTimeMillis()).nextInt()); 515 url = buildURL(nodeHost, nodeName, protocol, port); 516 proActiveRuntimeRegistered.createLocalNode(url, false, 519 policyServer, this.getName(), this.jobID); 520 performOperations(proActiveRuntimeRegistered, url, protocol); 521 } 522 } catch (ProActiveException e) { 523 e.printStackTrace(); 524 } 525 } 526 527 if (awaitedVirtualNodes.containsKey(event.getCreatorID())) { 529 System.out.println("Virtual Node ready to create node"); 531 proActiveRuntimeRegistered = proActiveRuntimeImpl.getProActiveRuntime(event.getRegisteredRuntimeName()); 532 nodeHost = proActiveRuntimeRegistered.getVMInformation() 534 .getInetAddress() 535 .getCanonicalHostName(); 536 protocol = event.getProtocol(); 537 try { 538 port = UrlBuilder.getPortFromUrl(proActiveRuntimeRegistered.getURL()); 539 } catch (ProActiveException e) { 540 logger.warn("port unknown: " + port); 541 } 542 543 VirtualMachine vm = (VirtualMachine) awaitedVirtualNodes.get(event.getCreatorID()); 545 int nodeNumber = (new Integer ((String ) vm.getNodeNumber())).intValue(); 546 for (int i = 1; i <= nodeNumber; i++) { 547 try { 548 nodeName = this.name + 549 Integer.toString(new java.util.Random ( 550 System.currentTimeMillis()).nextInt()); 551 url = buildURL(nodeHost, nodeName, protocol, port); 552 proActiveRuntimeRegistered.createLocalNode(url, false, 555 policyServer, this.getName(), this.jobID); 556 performOperations(proActiveRuntimeRegistered, url, protocol); 557 } catch (ProActiveException e) { 558 e.printStackTrace(); 559 } 560 } 561 } 562 } 563 564 568 public void setRuntimeInformations(String information, String value) 569 throws ProActiveException { 570 try { 571 checkProperty(information); 572 } catch (ProActiveException e) { 573 throw new ProActiveException("No property can be set at runtime on this VirtualNode", 574 e); 575 } 576 } 577 578 public void setRegistrationProtocol(String protocol) { 579 setRegistrationValue(true); 580 this.registrationProtocol = protocol; 581 } 582 583 public String getRegistrationProtocol() { 584 return this.registrationProtocol; 585 } 586 587 591 594 private void waitForNodeCreation() throws NodeException { 595 int count = 0; 596 while (!nodeCreated) { 597 if (count < MAX_RETRY) { 598 count++; 599 try { 600 Thread.sleep(1000); 601 } catch (InterruptedException e2) { 602 e2.printStackTrace(); 603 } 604 } else { 605 throw new NodeException( 606 "After many retries, not even one node can be found"); 607 } 608 } 609 return; 610 } 611 612 615 private void waitForAllNodesCreation() throws NodeException { 616 int count = 0; 617 618 if (waitForTimeout) { 619 while (count < MAX_RETRY) { 620 count++; 621 try { 622 Thread.sleep(1000); 623 } catch (InterruptedException e2) { 624 e2.printStackTrace(); 625 } 626 } 627 } else { 628 while (nodeCountCreated != nodeCount) { 629 if (count < MAX_RETRY) { 630 count++; 631 try { 632 Thread.sleep(1000); 633 } catch (InterruptedException e2) { 634 e2.printStackTrace(); 635 } 636 } else { 637 throw new NodeException("After many retries, only " + 638 nodeCountCreated + " nodes are created on " + 639 nodeCount + " expected"); 640 } 641 } 642 } 643 return; 644 } 645 646 651 private ExternalProcess getProcess(VirtualMachine vm, 652 boolean vmAlreadyAssigned) { 653 ExternalProcess copyProcess; 654 655 ExternalProcess process = vm.getProcess(); 657 658 if (!vmAlreadyAssigned) { 662 copyProcess = makeDeepCopy(process); 663 vm.setProcess(copyProcess); 664 return copyProcess; 665 } else { 666 increaseNodeCount(new Integer (vm.getNodeNumber()).intValue()); 668 return process; 669 } 670 } 671 672 676 private void setParameters(ExternalProcess process, VirtualMachine vm) { 677 ExternalProcess processImpl = process; 678 ExternalProcessDecorator processImplDecorator; 679 JVMProcess jvmProcess; 680 LSFBSubProcess bsub = null; 681 PrunSubProcess prun = null; 682 GlobusProcess globus = null; 683 String protocolId = ""; 684 int nodeNumber = new Integer (vm.getNodeNumber()).intValue(); 685 if (logger.isDebugEnabled()) { 686 logger.debug("nodeNumber " + nodeNumber); 687 } 688 689 while (ExternalProcessDecorator.class.isInstance(processImpl)) { 690 String processClassname = processImpl.getClass().getName(); 691 protocolId = protocolId + 692 findProtocolId(processClassname).toLowerCase(); 693 if (processImpl instanceof LSFBSubProcess) { 694 bsub = (LSFBSubProcess) processImpl; 696 increaseNodeCount((new Integer (bsub.getProcessorNumber()).intValue()) * nodeNumber); 697 } 698 if (processImpl instanceof PrunSubProcess) { 699 prun = (PrunSubProcess) processImpl; 701 if (logger.isDebugEnabled()) { 702 logger.debug("VirtualNodeImpl getHostsNumber() " + 703 prun.getHostsNumber()); 704 logger.debug("VirtualNodeImpl getnodeNumber() " + 705 prun.getProcessorPerNodeNumber()); 706 logger.debug("VM " + vm); 707 } 708 709 increaseNodeCount((new Integer (prun.getHostsNumber()).intValue()) * nodeNumber); 710 } 711 if (processImpl instanceof GlobusProcess) { 712 globus = (GlobusProcess) processImpl; 714 increaseNodeCount((new Integer (globus.getCount()).intValue()) * nodeNumber); 715 } 716 717 processImplDecorator = (ExternalProcessDecorator) processImpl; 718 processImpl = processImplDecorator.getTargetProcess(); 719 if (logger.isDebugEnabled()) { 720 logger.debug("processImplDecorator " + 721 processImplDecorator.getClass().getName()); 722 } 723 } 724 protocolId = protocolId + "jvm"; 725 jvmProcess = (JVMProcess) processImpl; 728 if (jvmProcess.getClassname().equals("org.objectweb.proactive.core.runtime.StartRuntime")) { 730 if ((bsub == null) && (prun == null) && (globus == null)) { 732 increaseNodeCount(nodeNumber); 734 } 735 736 String vnName = this.name; 738 739 String localruntimeURL = null; 740 try { 741 localruntimeURL = RuntimeFactory.getDefaultRuntime().getURL(); 742 } catch (ProActiveException e) { 743 e.printStackTrace(); 744 } 745 746 if (logger.isDebugEnabled()) { 747 logger.debug(localruntimeURL); 748 } 749 jvmProcess.setJvmOptions("-Dproactive.jobid=" + this.jobID); 750 jvmProcess.setParameters(vnName + " " + localruntimeURL + " " + 751 nodeNumber + " " + protocolId + " " + vm.getName()); 752 } 753 } 754 755 759 private String findProtocolId(String processClassname) { 760 int index = processClassname.lastIndexOf(".") + 1; 761 int lastIndex = processClassname.lastIndexOf("Process"); 762 return processClassname.substring(index, lastIndex) + "-"; 763 } 764 765 770 private ExternalProcess makeDeepCopy(ExternalProcess process) { 771 ExternalProcess result = null; 773 try { 774 java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream (); 775 java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream (baos); 776 oos.writeObject(process); 777 oos.flush(); 778 oos.close(); 779 java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream (baos.toByteArray()); 780 java.io.ObjectInputStream ois = new java.io.ObjectInputStream (bais); 781 result = (ExternalProcess) ois.readObject(); 782 ois.close(); 783 } catch (Exception e) { 784 e.printStackTrace(); 785 } 786 787 return result; 789 } 790 791 private String buildURL(String host, String name, String protocol, int port) { 792 if (port != 0) { 793 return UrlBuilder.buildUrl(host, name, protocol, port); 794 } else { 795 return UrlBuilder.buildUrl(host, name, protocol); 796 } 797 } 798 799 private void increaseIndex() { 800 if (virtualMachines.size() > 1) { 801 lastVirtualMachineIndex = (lastVirtualMachineIndex + 1) % virtualMachines.size(); 802 } 803 } 804 805 private void increaseNodeCount(int n) { 806 nodeCount = nodeCount + n; 807 if (logger.isDebugEnabled()) { 808 logger.debug("NodeCount: " + nodeCount); 809 } 810 } 811 812 private void increaseNodeIndex() { 813 if (createdNodes.size() > 1) { 814 lastNodeIndex = (lastNodeIndex + 1) % createdNodes.size(); 815 } 816 } 817 818 private String checkProtocol(String protocol) { 819 if (protocol.indexOf(":") == -1) { 820 return protocol.concat(":"); 821 } 822 return protocol; 823 } 824 825 private void performOperations(ProActiveRuntime part, String url, 826 String protocol) { 827 createdNodes.add(new NodeImpl(part, url, checkProtocol(protocol), 828 this.jobID)); 829 logger.info("**** Mapping VirtualNode " + this.name + " with Node: " + 830 url + " done"); 831 nodeCreated = true; 832 nodeCountCreated++; 833 } 834 835 private void register() { 836 try { 837 waitForAllNodesCreation(); 838 ProActive.registerVirtualNode(this, registrationProtocol, false); 841 } catch (NodeException e) { 842 logger.error(e.getMessage()); 843 } catch (ProActiveException e) { 844 e.printStackTrace(); 845 } 846 } 847 848 private void setRegistrationValue(boolean value) { 849 this.registration = value; 850 } 851 852 private void writeObject(java.io.ObjectOutputStream out) 853 throws java.io.IOException { 854 try { 855 waitForAllNodesCreation(); 856 } catch (NodeException e) { 857 e.printStackTrace(); 858 } 859 860 out.defaultWriteObject(); 861 } 862 863 private void readObject(java.io.ObjectInputStream in) 864 throws java.io.IOException , ClassNotFoundException { 865 in.defaultReadObject(); 866 } 867 868 870 873 public X509Certificate getCreatorCertificate() { 874 return creatorCertificate; 875 } 876 877 880 public PolicyServer getPolicyServer() { 881 return policyServer; 882 } 883 884 887 public void setPolicyServer(PolicyServer server) { 888 policyServer = server; 890 } 891 892 895 public void setPolicyFile(String file) { 896 policyServerFile = file; 897 } 898 } 899 | Popular Tags |