1 18 19 package org.drftpd.slave.async; 20 21 import java.io.BufferedReader ; 22 import java.io.EOFException ; 23 import java.io.IOException ; 24 import java.io.InputStreamReader ; 25 import java.io.PrintWriter ; 26 import java.io.StringReader ; 27 import java.net.InetAddress ; 28 import java.net.InetSocketAddress ; 29 import java.net.Socket ; 30 import java.rmi.ConnectIOException ; 31 import java.rmi.RemoteException ; 32 import java.rmi.server.Unreferenced ; 33 import java.security.MessageDigest ; 34 import java.util.Hashtable ; 35 import java.util.Iterator ; 36 import java.util.Random ; 37 import java.util.Vector ; 38 import java.util.Stack ; 39 40 import net.sf.drftpd.FatalException; 41 import net.sf.drftpd.SFVFile; 42 import net.sf.drftpd.master.ConnectionManager; 43 import net.sf.drftpd.remotefile.LinkedRemoteFile; 44 import net.sf.drftpd.remotefile.MLSTSerialize; 45 import net.sf.drftpd.slave.RootBasket; 46 import net.sf.drftpd.slave.Slave; 47 import net.sf.drftpd.slave.SlaveStatus; 48 import net.sf.drftpd.slave.Transfer; 49 import net.sf.drftpd.util.PortRange; 50 51 import org.apache.log4j.Logger; 52 53 57 public class AsyncSlave extends Thread implements Slave, Unreferenced { 58 private static final Logger logger = 59 Logger.getLogger(AsyncSlave.class); 60 61 private long disktotal = 0; 62 private long diskfree = 0; 63 64 private ConnectionManager _cman; 65 66 private boolean _uploadChecksums; 67 private boolean _downloadChecksums; 68 69 private String _name; 70 private String _spsw; 71 private String _mpsw; 72 private String _host; 73 private int _port; 74 75 private PortRange _portRange = new PortRange(); 76 77 private long _receivedBytes = 0; 78 private long _sentBytes = 0; 79 80 private Vector _transfers = new Vector (); 81 82 private Socket _sock = null; 83 private BufferedReader _sinp = null; 84 private PrintWriter _sout = null; 85 86 protected Hashtable commands = new Hashtable (); 87 protected Stack availcmd = new Stack (); 88 89 private LinkedRemoteFile _root; 90 91 public AsyncSlave(ConnectionManager mgr, Hashtable cfg) 92 throws RemoteException { 93 Socket sock = null; 94 _name = (String ) cfg.get("name"); 95 _spsw = (String ) cfg.get("slavepass"); 96 _mpsw = (String ) cfg.get("masterpass"); 97 _host = (String ) cfg.get("addr"); 98 _port = Integer.parseInt((String ) cfg.get("port")); 99 logger.info("Starting connect to " + _name + "@" + _host + ":" + _port); 100 try { 101 sock = new java.net.Socket (_host, _port); 102 } catch (IOException e) { 103 if (e instanceof ConnectIOException 104 && e.getCause() instanceof EOFException ) { 105 logger.info( 106 "Check slaves.xml on the master that you are allowed to connect."); 107 } 108 logger.info("IOException: " + e.toString(), e); 109 try { 110 sock.close(); 111 } catch (Exception e1) { 112 } 113 } catch (Exception e) { 115 logger.warn("Exception: " + e.toString()); 116 try { 117 if (sock != null) 118 sock.close(); 119 } catch (Exception e2) { 120 } 121 } 122 init(mgr, cfg, sock); 123 } 124 125 public AsyncSlave(ConnectionManager mgr, Hashtable cfg, Socket sock) 126 throws RemoteException { 127 _name = (String ) cfg.get("name"); 128 _spsw = (String ) cfg.get("slavepass"); 129 _mpsw = (String ) cfg.get("masterpass"); 130 _host = (String ) cfg.get("addr"); 131 _port = Integer.parseInt((String ) cfg.get("port")); 132 init(mgr, cfg, sock); 133 } 134 135 public void init(ConnectionManager mgr, Hashtable cfg, Socket sock) 136 throws RemoteException { 137 _cman = mgr; 138 _sock = sock; 139 for (int i=0; i<256; i++) { 140 String key = Integer.toHexString(i); 141 if (key.length()<2) key = "0" + key; 142 availcmd.push(key); 143 commands.put(key,null); 144 } 145 try { 146 _sout = new PrintWriter (_sock.getOutputStream(), true); 147 _sinp = 148 new BufferedReader ( 149 new InputStreamReader (_sock.getInputStream())); 150 151 String seed = ""; 153 Random rand = new Random (); 154 for (int i = 0; i < 16; i++) { 155 String hex = Integer.toHexString(rand.nextInt(256)); 156 if (hex.length() < 2) 157 hex = "0" + hex; 158 seed += hex.substring(hex.length() - 2); 159 } 160 String pass = _mpsw + seed + _spsw; 161 MessageDigest md5 = MessageDigest.getInstance("MD5"); 162 md5.reset(); 163 md5.update(pass.getBytes()); 164 String hash = hash2hex(md5.digest()).toLowerCase(); 165 166 String banner = "INIT " + "servername" + " " + hash + " " + seed; 167 168 sendLine(banner); 169 170 String txt = readLine(5); 172 if (txt == null) { 173 throw new IOException ("Slave did not send banner !!"); 174 } 175 176 String sname = ""; 177 String spass = ""; 178 String sseed = ""; 179 try { 180 String [] items = txt.split(" "); 181 sname = items[1].trim(); 182 spass = items[2].trim(); 183 sseed = items[3].trim(); 184 } catch (Exception e) { 185 AsyncSlaveListener.invalidSlave("INITFAIL BadKey", _sock); 186 } 187 pass = _spsw + sseed + _mpsw; 189 md5 = MessageDigest.getInstance("MD5"); 190 md5.reset(); 191 md5.update(pass.getBytes()); 192 hash = hash2hex(md5.digest()).toLowerCase(); 193 194 if (!sname.equals(_name)) { 196 AsyncSlaveListener.invalidSlave("INITFAIL Unknown", _sock); 197 } 198 if (!spass.toLowerCase().equals(hash.toLowerCase())) { 199 AsyncSlaveListener.invalidSlave("INITFAIL BadKey", _sock); 200 } 201 _cman.getSlaveManager().addSlave(_name, this, getSlaveStatus(), -1); 202 start(); 203 } catch (IOException e) { 204 if (e instanceof ConnectIOException 205 && e.getCause() instanceof EOFException ) { 206 logger.info( 207 "Check slaves.xml on the master that you are allowed to connect."); 208 } 209 logger.info("IOException: " + e.toString()); 210 try { 211 sock.close(); 212 } catch (Exception e1) { 213 } 214 } catch (Exception e) { 216 logger.warn("Exception: " + e.toString()); 217 try { 218 sock.close(); 219 } catch (Exception e2) { 220 } 221 } 222 System.gc(); 223 } 224 225 private String hash2hex(byte[] bytes) { 226 String res = ""; 227 for (int i = 0; i < 16; i++) { 228 String hex = Integer.toHexString((int) bytes[i]); 229 if (hex.length() < 2) 230 hex = "0" + hex; 231 res += hex.substring(hex.length() - 2); 232 } 233 return res; 234 } 235 236 240 public InetAddress getAddress() { 241 return _sock.getInetAddress(); 242 } 243 244 public boolean getDownloadChecksums() { 245 return _downloadChecksums; 246 } 247 248 public boolean getUploadChecksums() { 249 return _uploadChecksums; 250 } 251 252 public RootBasket getRoots() { 253 return null; 254 } 255 256 260 public void run() { 261 int tick = 0; 262 while (true) { 263 try { sleep(100); } catch (Exception e) {} 264 String msg = readLine(0); 265 if (msg == null) { 266 shutdown(); 268 return; 269 } 270 while (msg != "") { 271 String [] items = msg.split(","); 272 AsyncCommand cmd = (AsyncCommand)commands.get(items[0]); 273 msg = msg.substring(3); if (cmd._name.equals("ping")) processPing(cmd,msg); 275 else if (cmd._name.equals("xfer")) xferMessage(msg); 276 else if (cmd._name.equals("disk")) processDisk(cmd,msg); 277 else if (cmd._name.equals("conn")) processConnect(cmd,msg); 278 else if (cmd._name.equals("send")) processSend(cmd,msg); 279 else if (cmd._name.equals("recv")) processRecv(cmd,msg); 280 else if (cmd._name.equals("renm")) processRename(cmd,msg); 281 else if (cmd._name.equals("dele")) processDelete(cmd,msg); 282 else if (cmd._name.equals("list")) processList(cmd,msg); 283 else if (cmd._name.equals("csum")) processCRC32(cmd,msg); 284 else if (cmd._name.equals("dump")) processDump(cmd,msg); 285 msg = readLine(0); 286 if (msg == null) { 287 shutdown(); 289 return; 290 } 291 } 292 } 293 } 294 295 private void shutdown() { 296 try { 299 _cman.getSlaveManager().delSlave(_name, "Connection lost"); 300 } catch (Exception e) { 301 } 302 } 303 304 308 public void sendLine(String line) { 309 synchronized (_sout) { 310 _sout.println(line); 311 logger.info(_name + "< " + line); 312 } 313 } 314 315 private String readLine() { 316 return readLine(-1); 317 } 318 319 private String readLine(int secs) { 320 int cnt = secs * 10; 321 try { 322 while (true) { 323 while (!_sinp.ready()) { 324 if (cnt < 1) 325 return ""; 326 sleep(100); 327 cnt--; 328 if (cnt == 0) 329 return ""; 330 } 331 String txt = _sinp.readLine(); 332 if (txt == null) return null; 333 logger.info(_name + "> " + txt); 334 return txt; 335 } 336 } catch (Exception e) { 337 return null; 338 } 339 } 340 341 345 private void xferMessage(String msg) { 346 String [] items = msg.split(" "); 347 String xid = items[1]; 348 String sta = items[2]; 349 String cnt = items[3]; 350 String sum = items[4]; 351 String eid = items[5]; 352 String adr = items[6]; 353 354 long tid = Long.parseLong(xid); 355 long byt = Long.parseLong(cnt); 356 long err = Long.parseLong(eid); 357 long crc = Long.parseLong(sum, 16); 358 359 synchronized (_transfers) { 360 for (int i = 0; i < _transfers.size(); i++) { 361 AsyncTransfer tmp = (AsyncTransfer) _transfers.get(i); 362 if (tmp.getID() == tid) { 363 tmp.updateStats(sta, byt, crc, err, adr); 365 break; 366 } 367 } 368 } 369 } 370 371 public void addTransfer(AsyncTransfer transfer) { 372 synchronized (_transfers) { 373 _transfers.add(transfer); 374 } 375 } 376 377 public void removeTransfer(AsyncTransfer transfer) { 378 synchronized (_transfers) { 379 switch (transfer.getDirection()) { 380 case Transfer.TRANSFER_RECEIVING_UPLOAD : 381 _receivedBytes += transfer.getTransfered(); 382 break; 383 case Transfer.TRANSFER_SENDING_DOWNLOAD : 384 _sentBytes += transfer.getTransfered(); 385 break; 386 default : 387 throw new IllegalArgumentException (); 388 } 389 if (!_transfers.remove(transfer)) 390 throw new IllegalStateException (); 391 } 392 } 393 394 398 private void processPing(AsyncCommand cmd, String res) { 399 cmd.setStatus(0); 400 } 401 402 private void processCRC32(AsyncCommand cmd, String res) { 403 if (res.startsWith("CRCFAIL ")) { 404 cmd.setStatus(1); 405 return; 406 } 407 cmd._data.put("crc32", res.substring(6)); 408 cmd.setStatus(0); 409 } 410 411 private void processDump(AsyncCommand cmd, String res) { 412 if (res.startsWith("DUMPFAIL ")) { 413 cmd.setStatus(1); 414 return; 415 } 416 if (!cmd._data.containsKey("data")) { 417 cmd._data.put("data", new StringBuffer (65536)); 418 } 419 if (res.equals("DUMPEND")) { 420 cmd.setStatus(0); 421 } 422 StringBuffer buf = (StringBuffer )cmd._data.get("data"); 423 buf.append(res); 424 } 425 426 private void processDelete(AsyncCommand cmd, String res) { 427 if (res.startsWith("DELFAIL ")) 428 cmd.setStatus(1); 429 else 430 cmd.setStatus(0); 431 } 432 433 private void processRename(AsyncCommand cmd, String res) { 434 if (res.startsWith("RENFAIL ")) 435 cmd.setStatus(1); 436 else 437 cmd.setStatus(0); 438 } 439 440 private void processSend(AsyncCommand cmd, String res) { 441 if (res.startsWith("SENDFAIL ")) { 442 cmd.setStatus(-1); 443 } else { 444 cmd.setStatus(0); 446 } 447 } 448 449 private void processRecv(AsyncCommand cmd, String res) { 450 if (res.startsWith("RECVFAIL ")) { 451 cmd.setStatus(-1); 452 } else { 453 cmd.setStatus(0); 455 } 456 } 457 458 private void processConnect(AsyncCommand cmd, String res) { 459 if (res.startsWith("CONNFAIL ")) { 460 cmd.setStatus(-1); 461 } else { 462 String [] items = res.split(" "); 464 cmd.getData().put("conn", items[1]); 465 if (items.length > 2) 466 cmd.getData().put("addr", items[2]); 467 cmd.setStatus(0); 468 } 469 } 470 471 private void processDisk(AsyncCommand cmd, String res) { 472 if (res.startsWith("DISKFAIL ")) { 473 cmd.setStatus(-1); 474 } else { 475 String [] items = res.split(" "); 477 cmd.getData().put("total", items[1]); 478 cmd.getData().put("free", items[2]); 479 } 480 } 481 482 private void processList(AsyncCommand cmd, String res) { 483 if (!cmd._data.containsKey("data")) { 484 cmd._data.put("data", new StringBuffer (65536)); 485 } 486 if (res.equals("DUMPEND")) { 487 cmd.setStatus(0); 488 } 489 StringBuffer buf = (StringBuffer )cmd._data.get("data"); 490 if (res.startsWith("LISTFAIL")) { 491 cmd.setStatus(-1); 492 } else { 493 if (res.equals("LISTEND")) { 494 try { 495 LinkedRemoteFile root = 496 MLSTSerialize.unserialize( 497 _cman.getConfig(), 498 new StringReader (buf.toString()), 499 _cman.getSlaveManager().getSlaveList()); 500 _root = root; 501 } catch (Exception e) { 502 logger.info("LIST Exception from " + getName(), e); 503 } 504 return; 505 } 506 if (res.equals("LISTBEGIN")) { 507 return; 508 } 509 if (!res.startsWith("/") 510 && !res.equals("") 511 && res.indexOf("type=dir;") == -1) 512 buf.append("x.slaves=" + _name + ";"); 513 buf.append(res); 514 buf.append((String ) "\n"); 515 } 516 } 517 518 519 523 public AsyncCommand sendCommand(String cmd, String args) { 524 while (availcmd.size() == 0) { 525 try { sleep(100); } catch (Exception e) {} 526 } 527 String chan = (String )availcmd.pop(); 528 AsyncCommand tmp = new AsyncCommand(chan, cmd, args, this); 529 commands.put(chan, tmp); 530 String msg = chan + " " + cmd + " " + args; 531 sendLine(msg); 532 return tmp; 533 } 534 535 public void releaseChan(String chan) 536 { 537 availcmd.push(chan); 538 commands.put(chan, null); 539 } 540 541 public void waitForCommand(AsyncCommand cmd) { 542 cmd.waitForComplete(); 543 } 544 545 public void ping() { 546 logger.debug("Trying PING"); 547 AsyncCommand cmd = sendCommand("ping", ""); 548 waitForCommand(cmd); 549 return; 550 } 551 552 public long checkSum(String path) throws IOException { 553 AsyncCommand cmd = sendCommand("csum", "\"" + path + "\""); 554 waitForCommand(cmd); 555 return Integer.parseInt((String ) cmd._data.get("crc32")); 556 } 557 558 public String dumpfile(String path) throws IOException { 559 AsyncCommand cmd = sendCommand("dump", "\"" + path + "\""); 560 waitForCommand(cmd); 561 return (String ) cmd._data.get("data"); 562 } 563 564 public void delete(String path) throws IOException { 565 AsyncCommand cmd = sendCommand("dele", "\"" + path + "\""); 566 waitForCommand(cmd); 567 } 568 569 public void rename(String from, String toDirPath, String toName) 570 throws IOException { 571 AsyncCommand cmd = sendCommand("renm", "\"" + from + "\" "+ toDirPath + "/" + toName + "\""); 572 waitForCommand(cmd); 573 } 574 575 public SFVFile getSFVFile(String path) throws IOException { 576 String sfv = dumpfile(path); 577 return new SFVFile(new BufferedReader (new StringReader (sfv))); 578 } 579 580 public Transfer connect(InetSocketAddress addr, boolean encrypted) { 581 AsyncCommand cmd = sendCommand("conn", addr.getAddress().getHostAddress()); 582 waitForCommand(cmd); 583 Transfer tmp = (Transfer) new AsyncTransfer(this, cmd); 584 return tmp; 585 } 586 587 public Transfer listen(boolean encrypted) 588 throws RemoteException , IOException { 589 AsyncCommand cmd = sendCommand("conn", ""); 590 waitForCommand(cmd); 591 Transfer tmp = (Transfer) new AsyncTransfer(this, cmd); 592 return tmp; 593 } 594 595 public SlaveStatus getSlaveStatus() { 596 int throughputUp = 0, throughputDown = 0; 597 int transfersUp = 0, transfersDown = 0; 598 long bytesReceived, bytesSent; 599 synchronized (_transfers) { 600 bytesReceived = _receivedBytes; 601 bytesSent = _sentBytes; 602 for (Iterator i = _transfers.iterator(); i.hasNext();) { 603 AsyncTransfer transfer = (AsyncTransfer) i.next(); 604 switch (transfer.getDirection()) { 605 case Transfer.TRANSFER_RECEIVING_UPLOAD : 606 throughputUp += transfer.getXferSpeed(); 607 transfersUp += 1; 608 bytesReceived += transfer.getTransfered(); 609 break; 610 case Transfer.TRANSFER_SENDING_DOWNLOAD : 611 throughputDown += transfer.getXferSpeed(); 612 transfersDown += 1; 613 bytesSent += transfer.getTransfered(); 614 break; 615 default : 616 throw new FatalException("unrecognized direction"); 617 } 618 } 619 } 620 AsyncCommand cmd = sendCommand("disk", ""); 621 waitForCommand(cmd); 622 Hashtable result = cmd._data; 623 long dtotal, dfree; 624 if (result != null) { 625 disktotal = Long.parseLong(result.get("total").toString()); 626 diskfree = Long.parseLong(result.get("free").toString()); 627 } 628 try { 629 return new SlaveStatus( 630 diskfree, 631 disktotal, bytesSent, 634 bytesReceived, 635 throughputUp, 636 transfersUp, 637 throughputDown, 638 transfersDown); 639 } catch (Exception ex) { 640 ex.printStackTrace(); 641 throw new RuntimeException (ex.toString()); 642 } 643 } 644 645 public LinkedRemoteFile getSlaveRoot() throws IOException { 646 AsyncCommand cmd = sendCommand("list", ""); 647 waitForCommand(cmd); 648 return _root; 649 } 650 651 655 public void unreferenced() { 656 logger.info("unreferenced"); 657 System.exit(0); 658 } 659 660 public InetAddress getPeerAddress() { 661 try { 662 return InetAddress.getLocalHost(); 663 } catch (Exception e) { 664 throw new RuntimeException (); 665 } 666 } 667 668 } 669 | Popular Tags |