1 18 package net.sf.drftpd.master; 19 20 import java.io.FileNotFoundException ; 21 import java.io.FileReader ; 22 import java.io.FileWriter ; 23 import java.io.IOException ; 24 import java.lang.reflect.Constructor ; 25 import java.net.InetAddress ; 26 import java.rmi.RemoteException ; 27 import java.rmi.registry.LocateRegistry ; 28 import java.rmi.registry.Registry ; 29 import java.rmi.server.RMIClientSocketFactory ; 30 import java.rmi.server.RMIServerSocketFactory ; 31 import java.rmi.server.RMISocketFactory ; 32 import java.rmi.server.RemoteServer ; 33 import java.rmi.server.UnicastRemoteObject ; 34 import java.util.ArrayList ; 35 import java.util.Collection ; 36 import java.util.Collections ; 37 import java.util.HashMap ; 38 import java.util.HashSet ; 39 import java.util.Hashtable ; 40 import java.util.Iterator ; 41 import java.util.List ; 42 import java.util.ListIterator ; 43 import java.util.Properties ; 44 import java.util.Set ; 45 46 import net.sf.drftpd.FatalException; 47 import net.sf.drftpd.NoAvailableSlaveException; 48 import net.sf.drftpd.ObjectNotFoundException; 49 import net.sf.drftpd.SlaveUnavailableException; 50 import net.sf.drftpd.event.SlaveEvent; 51 import net.sf.drftpd.master.usermanager.UserFileException; 52 import net.sf.drftpd.remotefile.LinkedRemoteFile; 53 import net.sf.drftpd.remotefile.MLSTSerialize; 54 import net.sf.drftpd.slave.Slave; 55 import net.sf.drftpd.slave.SlaveStatus; 56 import net.sf.drftpd.util.SafeFileWriter; 57 58 import org.apache.log4j.Level; 59 import org.apache.log4j.Logger; 60 import org.drftpd.slave.socket.SocketSlaveImpl; 61 import org.drftpd.slaveselection.SlaveSelectionManagerInterface; 62 import org.jdom.Document; 63 import org.jdom.Element; 64 import org.jdom.JDOMException; 65 import org.jdom.input.SAXBuilder; 66 import org.jdom.output.XMLOutputter; 67 68 72 public class SlaveManagerImpl 73 extends UnicastRemoteObject 74 implements SlaveManager { 75 private static final Logger logger = 76 Logger.getLogger(SlaveManagerImpl.class.getName()); 77 78 81 95 110 122 public static List loadRSlaves() 123 { 124 if (_self == null) return null; 125 return _self.loadSlaves(); 126 } 127 128 private Properties elementToProps(Element config) 129 { 130 Properties props = new Properties (); 131 String masks = ""; 132 List maskElements = config.getChildren("mask"); 133 for (Iterator i2 = maskElements.iterator(); i2.hasNext();) { 134 if (!masks.equals("")) masks += ","; 135 masks += ((Element) i2.next()).getText(); 136 } 137 props.put("masks", masks); 138 for (Iterator i = config.getChildren().iterator(); i.hasNext();) { 139 Element e = (Element) i.next(); 140 if (e.getName().equalsIgnoreCase("mask")) continue; 141 try { 142 props.setProperty(e.getName(), e.getText()); 143 } catch (Exception e1) { 144 } 145 } 146 return props; 147 } 148 149 public List loadSlaves() { 150 ArrayList rslaves; 151 try { 152 Document doc = 153 new SAXBuilder().build(new FileReader ("conf/slaves.xml")); 154 List children = doc.getRootElement().getChildren("slave"); 155 rslaves = new ArrayList (children.size()); 156 for (Iterator i = children.iterator(); i.hasNext();) { 157 Properties props = elementToProps((Element) i.next()); 159 rslaves.add(new RemoteSlave(props)); 160 } 161 rslaves.trimToSize(); 162 } catch (Exception ex) { 163 throw new FatalException(ex); 165 } 166 Collections.sort(rslaves); 167 return rslaves; 168 } 169 170 public static Collection rslavesToMasks(Collection rslaves) { 171 if (_self != null) return _self.getMasks(rslaves); 172 return null; 173 } 174 175 public Collection getMasks(Collection rslaves) { 176 ArrayList masks = new ArrayList (); 177 for (Iterator iter = rslaves.iterator(); iter.hasNext();) { 178 RemoteSlave rslave2 = (RemoteSlave) iter.next(); 179 masks.addAll(rslave2.getMasks()); 180 } 181 return masks; 182 } 183 184 187 204 public void setRSlavesManager() { 205 for (Iterator iter = _rslaves.iterator(); iter.hasNext();) { 206 RemoteSlave rslave = (RemoteSlave) iter.next(); 207 rslave.setManager(this); 208 } 209 } 210 211 private ConnectionManager _cm; 212 213 protected List _rslaves; 214 protected RMIServerSocketFactory _ssf; 215 protected RMIClientSocketFactory _csf; 216 217 private SlaveSelectionManagerInterface _slaveSelectionManager; 218 219 private static SlaveManagerImpl _self = null; 220 221 protected SlaveManagerImpl() throws RemoteException { 222 _self = this; 223 } 224 225 public void init( 226 Properties cfg, 227 List rslaves, 228 RMIServerSocketFactory ssf, 229 ConnectionManager cm) 230 throws RemoteException 231 { 232 _csf = RMISocketFactory.getSocketFactory(); 233 _ssf = ssf; 234 _cm = cm; 235 _rslaves = rslaves; 236 237 setRSlavesManager(); 238 239 Registry registry = 240 LocateRegistry.createRegistry( 241 Integer.parseInt(cfg.getProperty("master.bindport", "1099")), 242 _csf, _ssf); 243 try { 245 registry.bind( 246 cfg.getProperty("master.bindname", "slavemanager"), 247 this); 248 } catch (Exception t) { 249 throw new FatalException(t); 250 } 251 try { 252 Constructor c = 253 Class 254 .forName( 255 cfg.getProperty( 256 "slaveselection", 257 "org.drftpd.slaveselection.def.SlaveSelectionManager")) 258 .getConstructor(new Class [] { SlaveManagerImpl.class }); 259 _slaveSelectionManager = 260 (SlaveSelectionManagerInterface) c.newInstance( 261 new Object [] { this }); 262 } catch (Exception e) { 263 if (e instanceof RuntimeException ) 264 throw (RuntimeException ) e; 265 throw new FatalException(e); 266 } 267 logger.debug("starting slavestatus updater thread"); 268 new SlaveStatusUpdater().start(); 270 } 271 272 protected void addShutdownHook() { 273 Runtime.getRuntime().addShutdownHook(new Thread () { 275 public void run() { 276 logger.info("Running shutdown hook"); 277 saveFilelist(); 278 try { 279 getConnectionManager().getUserManager().saveAll(); 280 } catch (UserFileException e) { 281 logger.warn("", e); 282 } 283 } 284 }); 285 } 286 287 public void addSlave( 288 String slaveName, 289 Slave slave, 290 SlaveStatus status, 291 int maxPath) 292 throws RemoteException { 293 294 slave.ping(); 295 296 RemoteSlave rslave = null; 297 for (Iterator iter = _rslaves.iterator(); iter.hasNext();) { 298 RemoteSlave rslave2 = (RemoteSlave) iter.next(); 299 if (rslave2.getName().equals(slaveName)) { 300 rslave = rslave2; 301 break; 302 } 303 } 304 if (rslave == null) 305 throw new IllegalArgumentException ("Slave not found in slaves.xml"); 306 307 if (rslave.isAvailablePing()) { 308 throw new IllegalArgumentException ( 309 rslave.getName() + " is already online"); 310 } 311 312 try { 313 InetAddress addr = null; 314 if (slave instanceof SocketSlaveImpl) { 315 addr = ((SocketSlaveImpl) slave).getPeerAddress(); 316 } 317 if (addr == null) { 318 addr = InetAddress.getByName(RemoteServer.getClientHost()); 319 } 320 321 if (addr == null) { 323 throw new IllegalArgumentException ( 324 rslave.getName() + " has no slave address"); 325 } 326 rslave.setSlave(slave, addr, slave.getSlaveStatus(), maxPath); 327 } catch (Throwable e1) { 328 throw new FatalException(e1); 329 } 330 logger.debug("About to remerge(), slave is " + rslave); 331 try { 332 remerge(rslave); 333 } catch (IOException e) { 334 logger.warn("", e); 335 rslave.setOffline("IOException during remerge()"); 336 return; 337 } catch (SlaveUnavailableException e) { 338 logger.warn("", e); 339 rslave.setOffline("Slave Unavailable during remerge()"); 340 return; 341 } 342 343 logger.info("Slave added: '" + rslave.getName() + "' status: " + status); 344 345 getConnectionManager().dispatchFtpEvent( 346 new SlaveEvent("ADDSLAVE", rslave)); 347 } 348 349 public void delSlave(String slaveName, String reason) 350 throws RemoteException { 351 352 RemoteSlave rslave = null; 353 for (Iterator iter = _rslaves.iterator(); iter.hasNext();) { 354 RemoteSlave rslave2 = (RemoteSlave) iter.next(); 355 if (rslave2.getName().equals(slaveName)) { 356 rslave = rslave2; 357 break; 358 } 359 } 360 if (rslave == null) 361 throw new IllegalArgumentException ( 362 "Slave not found in slaves.xml (" + slaveName + ")"); 363 364 rslave.setOffline(reason); 365 } 366 367 public HashSet findSlavesBySpace(int numOfSlaves, Set exemptSlaves, boolean ascending) { 368 Collection slaveList = 369 getConnectionManager().getSlaveManager().getSlaveList(); 370 HashMap map = new HashMap (); 371 for (Iterator iter = slaveList.iterator(); iter.hasNext();) { 372 RemoteSlave rslave = (RemoteSlave) iter.next(); 373 if (exemptSlaves.contains(rslave)) 374 continue; 375 Long size; 376 try { 377 size = new Long (rslave.getStatus().getDiskSpaceAvailable()); 378 } catch (SlaveUnavailableException e) { 379 continue; 380 } 381 map.put(size,rslave); 382 } 383 ArrayList sorted = new ArrayList (map.keySet()); 384 if (ascending) { 385 Collections.sort(sorted); 386 } else { 387 Collections.sort(sorted, Collections.reverseOrder()); 388 } 389 HashSet returnMe = new HashSet (); 390 for (ListIterator iter = sorted.listIterator(); iter.hasNext();) { 391 if (iter.nextIndex()==numOfSlaves) 392 break; 393 Long key = (Long ) iter.next(); 394 RemoteSlave rslave = (RemoteSlave) map.get(key); 395 returnMe.add(rslave); 396 } 397 return returnMe; 398 } 399 400 401 public RemoteSlave findSmallestFreeSlave() { 402 Collection slaveList = 403 getConnectionManager().getSlaveManager().getSlaveList(); 404 long smallSize = Integer.MAX_VALUE; 405 RemoteSlave smallSlave = null; 406 for (Iterator iter = slaveList.iterator(); iter.hasNext();) { 407 RemoteSlave rslave = (RemoteSlave) iter.next(); 408 long size = Integer.MAX_VALUE; 409 try { 410 size = rslave.getStatus().getDiskSpaceAvailable(); 411 } catch (SlaveUnavailableException e) { 412 continue; 413 } 414 if (size < smallSize) { 415 smallSize = size; 416 smallSlave = rslave; 417 } 418 } 419 return smallSlave; 420 } 421 422 425 public SlaveStatus getAllStatus() { 426 SlaveStatus allStatus = new SlaveStatus(); 427 for (Iterator iter = getSlaves().iterator(); iter.hasNext();) { 428 RemoteSlave rslave = (RemoteSlave) iter.next(); 429 try { 430 allStatus = allStatus.append(rslave.getStatus()); 431 } catch (SlaveUnavailableException e) { 432 } 434 } 435 return allStatus; 436 } 437 438 458 public Collection getAvailableSlaves() throws NoAvailableSlaveException { 459 ArrayList availableSlaves = new ArrayList (); 460 for (Iterator iter = getSlaves().iterator(); iter.hasNext();) { 461 RemoteSlave rslave = (RemoteSlave) iter.next(); 462 if (!rslave.isAvailable()) 463 continue; 464 availableSlaves.add(rslave); 465 } 466 if (availableSlaves.isEmpty()) { 467 throw new NoAvailableSlaveException("No slaves online"); 468 } 469 return availableSlaves; 470 } 471 472 public ConnectionManager getConnectionManager() { 473 return _cm; 474 } 475 476 public RemoteSlave getSlave(String s) throws ObjectNotFoundException { 477 for (Iterator iter = getSlaves().iterator(); iter.hasNext();) { 478 RemoteSlave rslave = (RemoteSlave) iter.next(); 479 if (rslave.getName().equals(s)) 480 return rslave; 481 } 482 throw new ObjectNotFoundException(s + ": No such slave"); 483 } 484 485 public List getSlaveList() { 486 return _rslaves; 487 } 488 489 public Collection getSlaves() { 490 return _rslaves; 491 } 492 493 public SlaveSelectionManagerInterface getSlaveSelectionManager() { 494 return _slaveSelectionManager; 495 } 496 499 public void handleRemoteException(RemoteException ex, RemoteSlave rslave) { 500 rslave.handleRemoteException(ex); 501 } 502 503 507 public boolean hasAvailableSlaves() { 508 for (Iterator iter = _rslaves.iterator(); iter.hasNext();) { 509 RemoteSlave rslave = (RemoteSlave) iter.next(); 510 if (rslave.isAvailable()) 511 return true; 512 } 513 return false; 514 } 515 516 public void reload() throws FileNotFoundException , IOException { 517 _slaveSelectionManager.reload(); 518 reloadRSlaves(); 519 } 520 public void reloadRSlaves() throws FileNotFoundException , IOException { 521 Document doc; 522 try { 523 doc = new SAXBuilder().build(new FileReader ("conf/slaves.xml")); 524 } catch (JDOMException e) { 525 throw (IOException ) new IOException ().initCause(e); 526 } 527 528 List slaveElements = doc.getRootElement().getChildren("slave"); 529 530 synchronized (_rslaves) { 532 nextslave : for ( 533 Iterator iter = _rslaves.iterator(); iter.hasNext();) { 534 RemoteSlave rslave = (RemoteSlave) iter.next(); 535 536 for (Iterator iterator = slaveElements.iterator(); 537 iterator.hasNext(); 538 ) { 539 Element slaveElement = (Element) iterator.next(); 540 if (rslave 541 .getName() 542 .equals(slaveElement.getChildText("name"))) { 543 logger.log( 544 Level.DEBUG, 545 rslave.getName() + " still in slaves.xml"); 546 continue nextslave; 547 } 548 } 549 logger.log( 550 Level.WARN, 551 rslave.getName() + " no longer in slaves.xml, unmerging"); 552 rslave.setOffline("Slave removed from slaves.xml"); 553 getConnectionManager().getRoot().unmergeDir(rslave); 554 iter.remove(); 556 } 557 } 558 559 nextelement : for ( 560 Iterator iterator = slaveElements.iterator(); 561 iterator.hasNext(); 562 ) { 563 Element slaveElement = (Element) iterator.next(); 564 565 for (Iterator iter = _rslaves.iterator(); iter.hasNext();) { 566 RemoteSlave rslave = (RemoteSlave) iter.next(); 567 568 if (slaveElement 569 .getChildText("name") 570 .equals(rslave.getName())) { 571 rslave.updateConfig(elementToProps(slaveElement)); 572 continue nextelement; 581 } 582 } RemoteSlave rslave = new RemoteSlave(elementToProps(slaveElement)); 584 rslave.setManager(this); 585 _rslaves.add(rslave); 586 logger.log(Level.INFO, "Added " + rslave.getName() + " to slaves"); 587 } 588 Collections.sort(_rslaves); 589 } 590 591 public void remerge(RemoteSlave rslave) 592 throws IOException , SlaveUnavailableException { 593 LinkedRemoteFile slaveroot; 594 slaveroot = rslave.getSlaveRoot(); 595 try { 596 getConnectionManager().getRoot().remerge(slaveroot, rslave); 597 } catch (RuntimeException t) { 598 logger.log(Level.FATAL, "", t); 599 rslave.setOffline(t.getMessage()); 600 throw t; 601 } 602 } 603 604 public class SlaveStatusUpdater extends Thread { 605 public SlaveStatusUpdater() { 606 super("SlaveStatusUpdater"); 607 } 608 public void run() { 609 logger.debug("started slavestatus updater thread"); 610 long low = Integer.MAX_VALUE; 611 long high = 0; 612 while (true) { 613 try { 614 for (Iterator iter = getAvailableSlaves().iterator(); 615 iter.hasNext(); 616 ) { 617 RemoteSlave slave = (RemoteSlave) iter.next(); 618 try { 619 long time = System.currentTimeMillis(); 620 slave.updateStatus(); 621 long difference = System.currentTimeMillis() - time; 622 if (difference < low) { 623 low = difference; 624 logger.debug( 625 low 626 + " low milliseconds were used to run updateStatus on " 627 + slave.getName()); 628 } 629 if (difference > high) { 630 high = difference; 631 logger.debug( 632 high 633 + " high milliseconds were used to run updateStatus on " 634 + slave.getName()); 635 } 636 } catch (SlaveUnavailableException e1) { 637 continue; 638 } 639 } 640 } catch (NoAvailableSlaveException e) { 641 } 642 try { 643 Thread.sleep(_cm.getConfig().getSlaveStatusUpdateTime()); 644 } catch (InterruptedException e1) { 645 } 646 } 647 } 648 } 649 650 public void saveFilelist() { 651 try { 652 SafeFileWriter out = new SafeFileWriter("files.mlst"); 653 try { 654 MLSTSerialize.serialize(getConnectionManager().getRoot(), out); 655 } finally { 656 out.close(); 657 } 658 } catch (IOException e) { 659 logger.warn("Error saving files.mlst", e); 660 } 661 } 662 663 664 public int verifySlaves() { 665 int removed = 0; 666 synchronized (_rslaves) { 667 for (Iterator i = _rslaves.iterator(); i.hasNext();) { 668 RemoteSlave slave = (RemoteSlave) i.next(); 669 if (!slave.isAvailablePing()) 670 removed++; 671 } 672 } 673 return removed; 674 } 675 676 public void updateSlave(String name, Hashtable args) throws IOException { 677 Element tmp; 678 679 String mask = (String )args.get("mask"); 680 String skey = (String )args.get("skey"); 681 String mkey = (String )args.get("mkey"); 682 String port = (String )args.get("port"); 683 String addr = (String )args.get("addr"); 684 685 Element slave = new Element("slave"); 687 688 tmp = new Element("name"); 689 tmp.setText(name); 690 slave.addContent(tmp); 691 692 tmp = new Element("addr"); 693 if (addr == null) { 694 tmp.setText("Dynamic"); 695 } else { 696 tmp.setText(addr); 697 } 698 slave.addContent(tmp); 699 700 if (mask != null) { 701 String [] masks = mask.split(","); 702 for (int i=0; i<masks.length; i++) { 703 tmp = new Element("mask"); 704 tmp.setText(masks[i]); 705 slave.addContent(tmp); 706 } 707 } 708 709 if (port != null) { 710 tmp = new Element("port"); 711 tmp.setText(port); 712 slave.addContent(tmp); 713 } 714 715 if (skey != null) { 716 tmp = new Element("slavepass"); 717 tmp.setText(skey); 718 slave.addContent(tmp); 719 } 720 721 if (mkey != null) { 722 tmp = new Element("masterpass"); 723 tmp.setText(mkey); 724 slave.addContent(tmp); 725 } 726 727 Document doc; 729 try { 730 doc = new SAXBuilder().build(new FileReader ("conf/slaves.xml")); 731 } catch (JDOMException e) { 732 throw (IOException ) new IOException ().initCause(e); 733 } 734 735 List slaveElements = doc.getRootElement().getChildren("slave"); 737 738 Element conf = null; 740 for (Iterator iterator = slaveElements.iterator(); iterator.hasNext(); ) { 741 Element slaveElement = (Element) iterator.next(); 742 if (name.equals(slaveElement.getChildText("name"))) { 743 conf = slaveElement; 744 } 745 } 746 747 if (conf == null) { 748 doc.getRootElement().addContent(slave); 750 } else { 751 slaveElements.remove(conf); 753 slaveElements.add(slave); 754 doc.setContent(slaveElements); 755 } 756 757 XMLOutputter out = new XMLOutputter(" ", true); 759 out.output(doc, new FileWriter ("conf/slaves.xml")); 760 out = null; 761 762 reloadRSlaves(); 764 765 } 766 } 767 | Popular Tags |