1 18 package org.drftpd.slave.socket; 19 20 import java.io.BufferedReader ; 21 import java.io.EOFException ; 22 import java.io.IOException ; 23 import java.io.InputStreamReader ; 24 import java.io.PrintWriter ; 25 import java.io.StringReader ; 26 import java.net.InetAddress ; 27 import java.net.InetSocketAddress ; 28 import java.net.Socket ; 29 import java.rmi.ConnectIOException ; 30 import java.rmi.RemoteException ; 31 import java.rmi.server.Unreferenced ; 32 import java.security.MessageDigest ; 33 import java.util.Hashtable ; 34 import java.util.Iterator ; 35 import java.util.Random ; 36 import java.util.Vector ; 37 38 import net.sf.drftpd.FatalException; 39 import net.sf.drftpd.SFVFile; 40 import net.sf.drftpd.master.ConnectionManager; 41 import net.sf.drftpd.remotefile.LinkedRemoteFile; 42 import net.sf.drftpd.remotefile.MLSTSerialize; 43 import net.sf.drftpd.slave.RootBasket; 44 import net.sf.drftpd.slave.Slave; 45 import net.sf.drftpd.slave.SlaveStatus; 46 import net.sf.drftpd.slave.Transfer; 47 import net.sf.drftpd.util.PortRange; 48 49 import org.apache.log4j.Logger; 50 51 55 public class SocketSlaveImpl extends Thread implements Slave, Unreferenced { 56 private static final Logger logger = 57 Logger.getLogger(SocketSlaveImpl.class); 58 59 private long disktotal = 0; 60 private long diskfree = 0; 61 62 private ConnectionManager _cman; 63 64 private boolean _uploadChecksums; 65 private boolean _downloadChecksums; 66 67 private String _name; 68 private String _spsw; 69 private String _mpsw; 70 private String _host; 71 private int _port; 72 73 private PortRange _portRange = new PortRange(); 74 75 private long _receivedBytes = 0; 76 private long _sentBytes = 0; 77 78 private Vector _transfers = new Vector (); 79 80 private Socket _sock = null; 81 private BufferedReader _sinp = null; 82 private PrintWriter _sout = null; 83 84 private Vector _que = new Vector (); 85 86 private LinkedRemoteFile _root; 87 88 public SocketSlaveImpl(ConnectionManager mgr, Hashtable cfg) 89 throws RemoteException { 90 Socket sock = null; 91 _name = (String ) cfg.get("name"); 92 _spsw = (String ) cfg.get("slavepass"); 93 _mpsw = (String ) cfg.get("masterpass"); 94 _host = (String ) cfg.get("addr"); 95 _port = Integer.parseInt((String ) cfg.get("port")); 96 logger.info("Starting connect to " + _name + "@" + _host + ":" + _port); 97 try { 98 sock = new java.net.Socket (_host, _port); 99 } catch (IOException e) { 100 if (e instanceof ConnectIOException 101 && e.getCause() instanceof EOFException ) { 102 logger.info( 103 "Check slaves.xml on the master that you are allowed to connect."); 104 } 105 logger.info("IOException: " + e.toString(), e); 106 try { 107 sock.close(); 108 } catch (Exception e1) { 109 } 110 } catch (Exception e) { 112 logger.warn("Exception: " + e.toString()); 113 try { 114 if (sock != null) 115 sock.close(); 116 } catch (Exception e2) { 117 } 118 } 119 init(mgr, cfg, sock); 120 } 121 122 public SocketSlaveImpl(ConnectionManager mgr, Hashtable cfg, Socket sock) 123 throws RemoteException { 124 _name = (String ) cfg.get("name"); 125 _spsw = (String ) cfg.get("slavepass"); 126 _mpsw = (String ) cfg.get("masterpass"); 127 _host = (String ) cfg.get("addr"); 128 _port = Integer.parseInt((String ) cfg.get("port")); 129 init(mgr, cfg, sock); 130 } 131 132 public void init(ConnectionManager mgr, Hashtable cfg, Socket sock) 133 throws RemoteException { 134 _cman = mgr; 135 _sock = sock; 136 try { 137 _sout = new PrintWriter (_sock.getOutputStream(), true); 138 _sinp = 139 new BufferedReader ( 140 new InputStreamReader (_sock.getInputStream())); 141 142 String seed = ""; 144 Random rand = new Random (); 145 for (int i = 0; i < 16; i++) { 146 String hex = Integer.toHexString(rand.nextInt(256)); 147 if (hex.length() < 2) 148 hex = "0" + hex; 149 seed += hex.substring(hex.length() - 2); 150 } 151 String pass = _mpsw + seed + _spsw; 152 MessageDigest md5 = MessageDigest.getInstance("MD5"); 153 md5.reset(); 154 md5.update(pass.getBytes()); 155 String hash = hash2hex(md5.digest()).toLowerCase(); 156 157 String banner = "INIT " + "servername" + " " + hash + " " + seed; 158 159 sendLine(banner); 160 161 String txt = readLine(5); 163 if (txt == null) { 164 throw new IOException ("Slave did not send banner !!"); 165 } 166 167 String sname = ""; 168 String spass = ""; 169 String sseed = ""; 170 try { 171 String [] items = txt.split(" "); 172 sname = items[1].trim(); 173 spass = items[2].trim(); 174 sseed = items[3].trim(); 175 } catch (Exception e) { 176 SocketSlaveListener.invalidSlave("INITFAIL BadKey", _sock); 177 } 178 pass = _spsw + sseed + _mpsw; 180 md5 = MessageDigest.getInstance("MD5"); 181 md5.reset(); 182 md5.update(pass.getBytes()); 183 hash = hash2hex(md5.digest()).toLowerCase(); 184 185 if (!sname.equals(_name)) { 187 SocketSlaveListener.invalidSlave("INITFAIL Unknown", _sock); 188 } 189 if (!spass.toLowerCase().equals(hash.toLowerCase())) { 190 SocketSlaveListener.invalidSlave("INITFAIL BadKey", _sock); 191 } 192 start(); 193 _cman.getSlaveManager().addSlave(_name, this, getSlaveStatus(), -1); 194 } catch (IOException e) { 195 if (e instanceof ConnectIOException 196 && e.getCause() instanceof EOFException ) { 197 logger.info( 198 "Check slaves.xml on the master that you are allowed to connect."); 199 } 200 logger.info("IOException: " + e.toString()); 201 try { 202 sock.close(); 203 } catch (Exception e1) { 204 } 205 } catch (Exception e) { 207 logger.warn("Exception: " + e.toString()); 208 try { 209 sock.close(); 210 } catch (Exception e2) { 211 } 212 } 213 System.gc(); 214 } 215 216 private String hash2hex(byte[] bytes) { 217 String res = ""; 218 for (int i = 0; i < 16; i++) { 219 String hex = Integer.toHexString((int) bytes[i]); 220 if (hex.length() < 2) 221 hex = "0" + hex; 222 res += hex.substring(hex.length() - 2); 223 } 224 return res; 225 } 226 227 231 public InetAddress getAddress() { 232 return _sock.getInetAddress(); 233 } 234 235 public boolean getDownloadChecksums() { 236 return _downloadChecksums; 237 } 238 239 public boolean getUploadChecksums() { 240 return _uploadChecksums; 241 } 242 243 public RootBasket getRoots() { 244 return null; 245 } 246 247 251 private class Command { 252 String _name; 253 Hashtable _args; 254 int _stat; 255 Hashtable _data = new Hashtable (); 256 boolean _done = false; 257 258 public Command(String name, Hashtable args) { 259 _name = name; 260 _args = args; 261 _stat = 0; 262 } 263 264 public String getName() { 265 return _name; 266 } 267 public Hashtable getArgs() { 268 return _args; 269 } 270 public Hashtable getData() { 271 return _data; 272 } 273 public int getStatus() { 274 return _stat; 275 } 276 public void setStatus(int stat) { 277 _stat = stat; 278 } 279 public void finished() { 280 _done = true; 281 } 282 } 283 284 288 public void run() { 289 int tick = 0; 290 while (true) { 291 while (true) { 292 try { 293 synchronized (_que) { 294 sleep(100); 295 } 296 } catch (Exception e) { 297 e.printStackTrace(); 298 } 299 if (_que.size() != 0) { 300 runque(); 301 tick = 0; 302 break; 303 } 304 readLine(0); 305 tick++; 306 if (tick < 300) 307 continue; 308 tick = 0; 309 _sout.println("ping"); 310 logger.info(_name + "< ping"); 311 if (readLine(5) == null) { 312 shutdown(null); 314 return; 315 } 316 } 317 } 318 } 319 320 private void runque() { 321 while (_que.size() > 0) { 323 Command cmd = (Command) _que.get(0); 324 _que.remove(0); 325 try { 326 process(cmd); 327 cmd.finished(); 328 } catch (Exception e) { 329 logger.info(_name + "$ exception=" + e.toString()); 330 shutdown(cmd); 332 return; 333 } 334 } 335 } 336 337 private void shutdown(Command cmd) { 338 if (cmd != null) { 340 cmd.setStatus(-1); 341 cmd.finished(); 342 } 343 while (_que.size() > 0) { 345 cmd = (Command) _que.get(0); 346 _que.remove(0); 347 cmd.setStatus(-1); 348 cmd.finished(); 349 } 350 try { 352 _cman.getSlaveManager().delSlave(_name, "Connection lost"); 353 } catch (Exception e) { 354 } 355 } 356 357 private void process(Command cmd) { 358 if (cmd.getName().equals("ping")) { 359 processPing(cmd); 360 return; 361 } 362 if (cmd.getName().equals("send")) { 363 processSend(cmd); 364 return; 365 } 366 if (cmd.getName().equals("recv")) { 367 processRecv(cmd); 368 return; 369 } 370 if (cmd.getName().equals("dump")) { 371 processDump(cmd); 372 return; 373 } 374 if (cmd.getName().equals("csum")) { 375 processCRC32(cmd); 376 return; 377 } 378 if (cmd.getName().equals("renm")) { 379 processRename(cmd); 380 return; 381 } 382 if (cmd.getName().equals("dele")) { 383 processDelete(cmd); 384 return; 385 } 386 if (cmd.getName().equals("conn")) { 387 processConnect(cmd); 388 return; 389 } 390 if (cmd.getName().equals("disk")) { 391 processDisk(cmd); 392 return; 393 } 394 if (cmd.getName().equals("list")) { 395 processList(cmd); 396 return; 397 } 398 } 399 400 404 private void sendLine(String line) { 405 _sout.println(line); 406 logger.info(_name + "< " + line); 407 } 408 409 private String readLine() { 410 return readLine(-1); 411 } 412 413 private String readLine(int secs) { 414 int cnt = secs * 10; 415 try { 416 while (true) { 417 while (!_sinp.ready()) { 418 if (cnt < 1) 419 return null; 420 sleep(100); 421 cnt--; 422 if (cnt == 0) 423 return null; 424 } 425 String txt = _sinp.readLine(); 426 logger.info(_name + "> " + txt); 427 if (txt.startsWith("XFER")) { 428 xferMessage(txt); 429 } else { 430 return txt; 431 } 432 } 433 } catch (Exception e) { 434 return null; 435 } 436 } 437 438 442 private void xferMessage(String msg) { 443 String [] items = msg.split(" "); 444 String xid = items[1]; 445 String sta = items[2]; 446 String cnt = items[3]; 447 String sum = items[4]; 448 String eid = items[5]; 449 String adr = items[6]; 450 451 long tid = Long.parseLong(xid); 452 long byt = Long.parseLong(cnt); 453 long err = Long.parseLong(eid); 454 long crc = Long.parseLong(sum, 16); 455 456 synchronized (_transfers) { 457 for (int i = 0; i < _transfers.size(); i++) { 458 SocketTransferImpl tmp = (SocketTransferImpl) _transfers.get(i); 459 if (tmp.getID() == tid) { 460 tmp.updateStats(sta, byt, crc, err, adr); 462 break; 463 } 464 } 465 } 466 } 467 468 public void addTransfer(SocketTransferImpl transfer) { 469 synchronized (_transfers) { 470 _transfers.add(transfer); 471 } 472 } 473 474 public void removeTransfer(SocketTransferImpl transfer) { 475 synchronized (_transfers) { 476 switch (transfer.getDirection()) { 477 case Transfer.TRANSFER_RECEIVING_UPLOAD : 478 _receivedBytes += transfer.getTransfered(); 479 break; 480 case Transfer.TRANSFER_SENDING_DOWNLOAD : 481 _sentBytes += transfer.getTransfered(); 482 break; 483 default : 484 throw new IllegalArgumentException (); 485 } 486 if (!_transfers.remove(transfer)) 487 throw new IllegalStateException (); 488 } 489 } 490 491 495 private void processPing(Command cmd) { 496 sendLine("ping"); 497 if (readLine(5) == null) 498 cmd.setStatus(-1); 499 } 500 501 private void processCRC32(Command cmd) { 502 String msg = "crc32 \"" + cmd.getArgs().get("path") + "\""; 503 sendLine(msg); 504 String res; 505 if ((res = readLine(5)) == null) { 506 cmd.setStatus(-1); 507 } else { 508 if (res.startsWith("CRCFAIL ")) { 510 cmd.setStatus(1); 511 return; 512 } 513 cmd._data.put("crc32", res.substring(6)); 514 } 515 } 516 517 private void processDump(Command cmd) { 518 String msg = "dump \"" + cmd.getArgs().get("path") + "\""; 519 sendLine(msg); 520 String res; 521 if ((res = readLine(5)) == null) { 522 cmd.setStatus(-1); 523 } else { 524 if (res.startsWith("DUMPFAIL ")) { 526 cmd.setStatus(1); 527 return; 528 } 529 StringBuffer buf = new StringBuffer (65536); 530 while ((res = readLine(5)) != null) { 531 if (res.equals("DUMPEND")) 532 break; 533 buf.append(res); 534 } 535 if (res == null) 536 cmd.setStatus(1); 537 cmd._data.put("data", new String (Base64.decode(buf.toString()))); 538 } 539 } 540 541 private void processDelete(Command cmd) { 542 String msg = "del \"" + cmd.getArgs().get("path") + "\""; 543 sendLine(msg); 544 String res; 545 if ((res = readLine(5)) == null) { 546 cmd.setStatus(-1); 547 } else { 548 if (res.startsWith("DELFAIL ")) 550 cmd.setStatus(1); 551 } 552 } 553 554 private void processRename(Command cmd) { 555 String msg = 556 "ren " 557 + "\"" 558 + cmd.getArgs().get("from") 559 + "\" " 560 + "\"" 561 + cmd.getArgs().get("path") 562 + "/" 563 + cmd.getArgs().get("name") 564 + "\""; 565 sendLine(msg); 566 String res; 567 if ((res = readLine(5)) == null) { 568 cmd.setStatus(-1); 569 } else { 570 if (res.startsWith("RENFAIL ")) 572 cmd.setStatus(1); 573 } 574 } 575 576 private void processSend(Command cmd) { 577 String msg = 578 "send" 579 + " \"" 580 + cmd.getArgs().get("path") 581 + "\"" 582 + " " 583 + cmd.getArgs().get("conn") 584 + " " 585 + cmd.getArgs().get("offs"); 586 sendLine(msg); 587 String res; 588 if ((res = readLine(5)) == null || res.startsWith("SENDFAIL")) { 589 cmd.setStatus(-1); 590 } else { 591 } 593 } 594 595 private void processRecv(Command cmd) { 596 String msg = 597 "recv" 598 + " \"" 599 + cmd.getArgs().get("path") 600 + "\"" 601 + " " 602 + cmd.getArgs().get("conn") 603 + " " 604 + cmd.getArgs().get("offs"); 605 sendLine(msg); 606 String res; 607 if ((res = readLine(5)) == null || res.startsWith("RECVFAIL")) { 608 cmd.setStatus(-1); 609 } else { 610 } 612 } 613 614 private void processConnect(Command cmd) { 615 String msg = "conn"; 616 if (cmd.getArgs().containsKey("addr")) { 617 msg = msg + " " + cmd.getArgs().get("addr"); 618 } 619 sendLine(msg); 620 String res; 621 if ((res = readLine(5)) == null || res.startsWith("CONNFAIL")) { 622 cmd.setStatus(-1); 623 } else { 624 String [] items = res.split(" "); 626 cmd.getData().put("conn", items[1]); 627 if (items.length > 2) 628 cmd.getData().put("addr", items[2]); 629 } 630 } 631 632 private void processDisk(Command cmd) { 633 String msg = "disk"; 634 sendLine(msg); 635 String res; 636 if ((res = readLine(5)) == null || res.startsWith("DISKFAIL")) { 637 cmd.setStatus(-1); 638 } else { 639 String [] items = res.split(" "); 641 cmd.getData().put("total", items[1]); 642 cmd.getData().put("free", items[2]); 643 } 644 } 645 646 private void processList(Command cmd) { 647 String msg = "list"; 648 sendLine(msg); 649 String txt; 650 StringBuffer sbuf = new StringBuffer (65536); 651 if ((txt = readLine(20)) == null || txt.startsWith("LISTFAIL")) { 652 cmd.setStatus(-1); 653 } else { 654 while ((txt = readLine(20)) != null) { 655 if (txt.equals("LISTEND")) 656 break; 657 if (txt.equals("LISTBEGIN")) 658 continue; 659 if (!txt.startsWith("/") 660 && !txt.equals("") 661 && txt.indexOf("type=dir;") == -1) 662 sbuf.append("x.slaves=" + _name + ";"); 663 sbuf.append(txt); 664 sbuf.append((String ) "\n"); 665 } 666 try { 667 LinkedRemoteFile root = 668 MLSTSerialize.unserialize( 669 _cman.getConfig(), 670 new StringReader (sbuf.toString()), 671 _cman.getSlaveManager().getSlaveList()); 672 _root = root; 673 } catch (Exception e) { 674 logger.info("LIST Exception from " + getName(), e); 675 } 676 } 677 } 678 679 683 public Hashtable doCommand(String name, Hashtable args) { 684 Command cmd = new Command(name, args); 685 _que.add(cmd); 686 while (!cmd._done) { 687 try { 688 sleep(100); 689 } catch (Exception e) { 690 } 691 } 692 if (cmd.getStatus() == -1) 693 return null; 694 return cmd.getData(); 695 } 696 697 public void ping() { 698 Hashtable args = new Hashtable (); 699 700 logger.debug("Trying PING"); 701 Hashtable result = doCommand("ping", args); 702 return; 703 } 704 705 public long checkSum(String path) throws IOException { 706 Hashtable args = new Hashtable (); 707 708 logger.debug("Checksumming: " + path); 709 args.put("path", path); 710 Hashtable result = doCommand("csum", args); 711 if (result == null) 712 throw new IOException ("Checksum command failed"); 713 return Integer.parseInt((String ) result.get("crc32")); 714 } 715 716 public String dumpfile(String path) throws IOException { 717 Hashtable args = new Hashtable (); 718 719 logger.debug("Retrieving: " + path); 720 args.put("path", path); 721 Hashtable result = doCommand("dump", args); 722 if (result == null) 723 throw new IOException ("Dump command failed"); 724 return (String ) result.get("data"); 725 } 726 727 public void delete(String path) throws IOException { 728 Hashtable args = new Hashtable (); 729 730 logger.debug("Deleting: " + path); 731 args.put("path", path); 732 Hashtable result = doCommand("dele", args); 733 if (result == null) 734 throw new IOException ("Delete command failed"); 735 return; 736 } 737 738 public void rename(String from, String toDirPath, String toName) 739 throws IOException { 740 Hashtable args = new Hashtable (); 741 742 logger.debug("Renaming: " + from + " -> " + toDirPath + "/" + toName); 743 args.put("from", from); 744 args.put("path", toDirPath); 745 args.put("name", toName); 746 Hashtable result = doCommand("renm", args); 747 if (result == null) 748 throw new IOException ("Rename command failed"); 749 return; 750 } 751 752 public SFVFile getSFVFile(String path) throws IOException { 753 String sfv = dumpfile(path); 754 return new SFVFile(new BufferedReader (new StringReader (sfv))); 755 } 756 757 public Transfer connect(InetSocketAddress addr, boolean encrypted) 758 throws RemoteException { 759 Hashtable args = new Hashtable (); 760 761 logger.debug("Connect: " + addr.toString()); 762 args.put("addr", addr.toString().substring(1)); 763 Hashtable result = doCommand("conn", args); 764 if (result == null) 765 return null; 766 Transfer tmp = (Transfer) new SocketTransferImpl(this, result); 767 return tmp; 768 } 769 770 public Transfer listen(boolean encrypted) 771 throws RemoteException , IOException { 772 Hashtable args = new Hashtable (); 773 774 logger.debug("Listen: "); 775 Hashtable result = doCommand("conn", args); 776 if (result == null) 777 return null; 778 Transfer tmp = (Transfer) new SocketTransferImpl(this, result); 779 return tmp; 780 } 781 782 public SlaveStatus getSlaveStatus() { 783 int throughputUp = 0, throughputDown = 0; 784 int transfersUp = 0, transfersDown = 0; 785 long bytesReceived, bytesSent; 786 synchronized (_transfers) { 787 bytesReceived = _receivedBytes; 788 bytesSent = _sentBytes; 789 for (Iterator i = _transfers.iterator(); i.hasNext();) { 790 SocketTransferImpl transfer = (SocketTransferImpl) i.next(); 791 switch (transfer.getDirection()) { 792 case Transfer.TRANSFER_RECEIVING_UPLOAD : 793 throughputUp += transfer.getXferSpeed(); 794 transfersUp += 1; 795 bytesReceived += transfer.getTransfered(); 796 break; 797 case Transfer.TRANSFER_SENDING_DOWNLOAD : 798 throughputDown += transfer.getXferSpeed(); 799 transfersDown += 1; 800 bytesSent += transfer.getTransfered(); 801 break; 802 default : 803 throw new FatalException("unrecognized direction"); 804 } 805 } 806 } 807 Hashtable args = new Hashtable (); 808 logger.debug("Status: "); 809 Hashtable result = doCommand("disk", args); 810 long dtotal, dfree; 811 if (result != null) { 812 disktotal = Long.parseLong(result.get("total").toString()); 813 diskfree = Long.parseLong(result.get("free").toString()); 814 } 815 try { 816 return new SlaveStatus( 817 diskfree, 818 disktotal, bytesSent, 821 bytesReceived, 822 throughputUp, 823 transfersUp, 824 throughputDown, 825 transfersDown); 826 } catch (Exception ex) { 827 ex.printStackTrace(); 828 throw new RuntimeException (ex.toString()); 829 } 830 } 831 832 public LinkedRemoteFile getSlaveRoot() throws IOException { 833 Hashtable args = new Hashtable (); 834 logger.debug("List: "); 835 Hashtable result = doCommand("list", args); 836 return _root; 837 } 838 839 843 public void unreferenced() { 844 logger.info("unreferenced"); 845 System.exit(0); 846 } 847 848 public InetAddress getPeerAddress() { 849 try { 850 return InetAddress.getLocalHost(); 851 } catch (Exception e) { 852 throw new RuntimeException (); 853 } 854 } 855 856 } 857 | Popular Tags |