KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > drftpd > slave > async > AsyncSlave


1 /*
2  * This file is part of DrFTPD, Distributed FTP Daemon.
3  *
4  * DrFTPD is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * DrFTPD is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with DrFTPD; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  */

18
19 package org.drftpd.slave.async;
20
21 import java.io.BufferedReader JavaDoc;
22 import java.io.EOFException JavaDoc;
23 import java.io.IOException JavaDoc;
24 import java.io.InputStreamReader JavaDoc;
25 import java.io.PrintWriter JavaDoc;
26 import java.io.StringReader JavaDoc;
27 import java.net.InetAddress JavaDoc;
28 import java.net.InetSocketAddress JavaDoc;
29 import java.net.Socket JavaDoc;
30 import java.rmi.ConnectIOException JavaDoc;
31 import java.rmi.RemoteException JavaDoc;
32 import java.rmi.server.Unreferenced JavaDoc;
33 import java.security.MessageDigest JavaDoc;
34 import java.util.Hashtable JavaDoc;
35 import java.util.Iterator JavaDoc;
36 import java.util.Random JavaDoc;
37 import java.util.Vector JavaDoc;
38 import java.util.Stack JavaDoc;
39
40 import net.sf.drftpd.FatalException;
41 import net.sf.drftpd.SFVFile;
42 import net.sf.drftpd.master.ConnectionManager;
43 import net.sf.drftpd.remotefile.LinkedRemoteFile;
44 import net.sf.drftpd.remotefile.MLSTSerialize;
45 import net.sf.drftpd.slave.RootBasket;
46 import net.sf.drftpd.slave.Slave;
47 import net.sf.drftpd.slave.SlaveStatus;
48 import net.sf.drftpd.slave.Transfer;
49 import net.sf.drftpd.util.PortRange;
50
51 import org.apache.log4j.Logger;
52
53 /**
54  * @author mog
55  * @version $Id: AsyncSlave.java,v 1.2 2004/05/21 18:42:01 zombiewoof64 Exp $
56  */

57 public class AsyncSlave extends Thread JavaDoc implements Slave, Unreferenced JavaDoc {
58     private static final Logger logger =
59     Logger.getLogger(AsyncSlave.class);
60     
61     private long disktotal = 0;
62     private long diskfree = 0;
63     
64     private ConnectionManager _cman;
65     
66     private boolean _uploadChecksums;
67     private boolean _downloadChecksums;
68     
69     private String JavaDoc _name;
70     private String JavaDoc _spsw;
71     private String JavaDoc _mpsw;
72     private String JavaDoc _host;
73     private int _port;
74     
75     private PortRange _portRange = new PortRange();
76     
77     private long _receivedBytes = 0;
78     private long _sentBytes = 0;
79     
80     private Vector JavaDoc _transfers = new Vector JavaDoc();
81     
82     private Socket JavaDoc _sock = null;
83     private BufferedReader JavaDoc _sinp = null;
84     private PrintWriter JavaDoc _sout = null;
85     
86     protected Hashtable JavaDoc commands = new Hashtable JavaDoc();
87     protected Stack JavaDoc availcmd = new Stack JavaDoc();
88     
89     private LinkedRemoteFile _root;
90     
91     public AsyncSlave(ConnectionManager mgr, Hashtable JavaDoc cfg)
92     throws RemoteException JavaDoc {
93         Socket JavaDoc sock = null;
94         _name = (String JavaDoc) cfg.get("name");
95         _spsw = (String JavaDoc) cfg.get("slavepass");
96         _mpsw = (String JavaDoc) cfg.get("masterpass");
97         _host = (String JavaDoc) cfg.get("addr");
98         _port = Integer.parseInt((String JavaDoc) cfg.get("port"));
99         logger.info("Starting connect to " + _name + "@" + _host + ":" + _port);
100         try {
101             sock = new java.net.Socket JavaDoc(_host, _port);
102         } catch (IOException JavaDoc e) {
103             if (e instanceof ConnectIOException JavaDoc
104             && e.getCause() instanceof EOFException JavaDoc) {
105                 logger.info(
106                 "Check slaves.xml on the master that you are allowed to connect.");
107             }
108             logger.info("IOException: " + e.toString(), e);
109             try {
110                 sock.close();
111             } catch (Exception JavaDoc e1) {
112             }
113             //System.exit(0);
114
} catch (Exception JavaDoc e) {
115             logger.warn("Exception: " + e.toString());
116             try {
117                 if (sock != null)
118                     sock.close();
119             } catch (Exception JavaDoc e2) {
120             }
121         }
122         init(mgr, cfg, sock);
123     }
124     
125     public AsyncSlave(ConnectionManager mgr, Hashtable JavaDoc cfg, Socket JavaDoc sock)
126     throws RemoteException JavaDoc {
127         _name = (String JavaDoc) cfg.get("name");
128         _spsw = (String JavaDoc) cfg.get("slavepass");
129         _mpsw = (String JavaDoc) cfg.get("masterpass");
130         _host = (String JavaDoc) cfg.get("addr");
131         _port = Integer.parseInt((String JavaDoc) cfg.get("port"));
132         init(mgr, cfg, sock);
133     }
134     
135     public void init(ConnectionManager mgr, Hashtable JavaDoc cfg, Socket JavaDoc sock)
136     throws RemoteException JavaDoc {
137         _cman = mgr;
138         _sock = sock;
139         for (int i=0; i<256; i++) {
140             String JavaDoc key = Integer.toHexString(i);
141             if (key.length()<2) key = "0" + key;
142             availcmd.push(key);
143             commands.put(key,null);
144         }
145         try {
146             _sout = new PrintWriter JavaDoc(_sock.getOutputStream(), true);
147             _sinp =
148             new BufferedReader JavaDoc(
149             new InputStreamReader JavaDoc(_sock.getInputStream()));
150             
151             // generate master hash
152
String JavaDoc seed = "";
153             Random JavaDoc rand = new Random JavaDoc();
154             for (int i = 0; i < 16; i++) {
155                 String JavaDoc hex = Integer.toHexString(rand.nextInt(256));
156                 if (hex.length() < 2)
157                     hex = "0" + hex;
158                 seed += hex.substring(hex.length() - 2);
159             }
160             String JavaDoc pass = _mpsw + seed + _spsw;
161             MessageDigest JavaDoc md5 = MessageDigest.getInstance("MD5");
162             md5.reset();
163             md5.update(pass.getBytes());
164             String JavaDoc hash = hash2hex(md5.digest()).toLowerCase();
165             
166             String JavaDoc banner = "INIT " + "servername" + " " + hash + " " + seed;
167             
168             sendLine(banner);
169             
170             // get slave banner
171
String JavaDoc txt = readLine(5);
172             if (txt == null) {
173                 throw new IOException JavaDoc("Slave did not send banner !!");
174             }
175             
176             String JavaDoc sname = "";
177             String JavaDoc spass = "";
178             String JavaDoc sseed = "";
179             try {
180                 String JavaDoc[] items = txt.split(" ");
181                 sname = items[1].trim();
182                 spass = items[2].trim();
183                 sseed = items[3].trim();
184             } catch (Exception JavaDoc e) {
185                 AsyncSlaveListener.invalidSlave("INITFAIL BadKey", _sock);
186             }
187             // generate slave hash
188
pass = _spsw + sseed + _mpsw;
189             md5 = MessageDigest.getInstance("MD5");
190             md5.reset();
191             md5.update(pass.getBytes());
192             hash = hash2hex(md5.digest()).toLowerCase();
193             
194             // authenticate
195
if (!sname.equals(_name)) {
196                 AsyncSlaveListener.invalidSlave("INITFAIL Unknown", _sock);
197             }
198             if (!spass.toLowerCase().equals(hash.toLowerCase())) {
199                 AsyncSlaveListener.invalidSlave("INITFAIL BadKey", _sock);
200             }
201             _cman.getSlaveManager().addSlave(_name, this, getSlaveStatus(), -1);
202             start();
203         } catch (IOException JavaDoc e) {
204             if (e instanceof ConnectIOException JavaDoc
205             && e.getCause() instanceof EOFException JavaDoc) {
206                 logger.info(
207                 "Check slaves.xml on the master that you are allowed to connect.");
208             }
209             logger.info("IOException: " + e.toString());
210             try {
211                 sock.close();
212             } catch (Exception JavaDoc e1) {
213             }
214             //System.exit(0);
215
} catch (Exception JavaDoc e) {
216             logger.warn("Exception: " + e.toString());
217             try {
218                 sock.close();
219             } catch (Exception JavaDoc e2) {
220             }
221         }
222         System.gc();
223     }
224     
225     private String JavaDoc hash2hex(byte[] bytes) {
226         String JavaDoc res = "";
227         for (int i = 0; i < 16; i++) {
228             String JavaDoc hex = Integer.toHexString((int) bytes[i]);
229             if (hex.length() < 2)
230                 hex = "0" + hex;
231             res += hex.substring(hex.length() - 2);
232         }
233         return res;
234     }
235     
236     //*********************************
237
// General property accessors
238
//*********************************
239

240     public InetAddress JavaDoc getAddress() {
241         return _sock.getInetAddress();
242     }
243     
244     public boolean getDownloadChecksums() {
245         return _downloadChecksums;
246     }
247     
248     public boolean getUploadChecksums() {
249         return _uploadChecksums;
250     }
251     
252     public RootBasket getRoots() {
253         return null;
254     }
255     
256     //*********************************
257
// Control Socket Processing Thread
258
//*********************************
259

260     public void run() {
261         int tick = 0;
262         while (true) {
263             try { sleep(100); } catch (Exception JavaDoc e) {}
264             String JavaDoc msg = readLine(0);
265             if (msg == null) {
266                 // do something to indicate the connection died
267
shutdown();
268                 return;
269             }
270             while (msg != "") {
271                 String JavaDoc[] items = msg.split(",");
272                 AsyncCommand cmd = (AsyncCommand)commands.get(items[0]);
273                 msg = msg.substring(3); // strip off channel ID
274
if (cmd._name.equals("ping")) processPing(cmd,msg);
275                 else if (cmd._name.equals("xfer")) xferMessage(msg);
276                 else if (cmd._name.equals("disk")) processDisk(cmd,msg);
277                 else if (cmd._name.equals("conn")) processConnect(cmd,msg);
278                 else if (cmd._name.equals("send")) processSend(cmd,msg);
279                 else if (cmd._name.equals("recv")) processRecv(cmd,msg);
280                 else if (cmd._name.equals("renm")) processRename(cmd,msg);
281                 else if (cmd._name.equals("dele")) processDelete(cmd,msg);
282                 else if (cmd._name.equals("list")) processList(cmd,msg);
283                 else if (cmd._name.equals("csum")) processCRC32(cmd,msg);
284                 else if (cmd._name.equals("dump")) processDump(cmd,msg);
285                 msg = readLine(0);
286                 if (msg == null) {
287                     // do something to indicate the connection died
288
shutdown();
289                     return;
290                 }
291             }
292         }
293     }
294     
295     private void shutdown() {
296         // fatal error occured, close it all down
297
// notify SlaveManager that we are dead
298
try {
299             _cman.getSlaveManager().delSlave(_name, "Connection lost");
300         } catch (Exception JavaDoc e) {
301         }
302     }
303     
304     //*********************************
305
// Control Socket I/O Methods
306
//*********************************
307

308     public void sendLine(String JavaDoc line) {
309         synchronized (_sout) {
310             _sout.println(line);
311             logger.info(_name + "< " + line);
312         }
313     }
314     
315     private String JavaDoc readLine() {
316         return readLine(-1);
317     }
318     
319     private String JavaDoc readLine(int secs) {
320         int cnt = secs * 10;
321         try {
322             while (true) {
323                 while (!_sinp.ready()) {
324                     if (cnt < 1)
325                         return "";
326                     sleep(100);
327                     cnt--;
328                     if (cnt == 0)
329                         return "";
330                 }
331                 String JavaDoc txt = _sinp.readLine();
332                 if (txt == null) return null;
333                 logger.info(_name + "> " + txt);
334                 return txt;
335             }
336         } catch (Exception JavaDoc e) {
337             return null;
338         }
339     }
340     
341     //*********************************
342
// Active Transfer Managment Methods
343
//*********************************
344

345     private void xferMessage(String JavaDoc msg) {
346         String JavaDoc[] items = msg.split(" ");
347         String JavaDoc xid = items[1];
348         String JavaDoc sta = items[2];
349         String JavaDoc cnt = items[3];
350         String JavaDoc sum = items[4];
351         String JavaDoc eid = items[5];
352         String JavaDoc adr = items[6];
353         
354         long tid = Long.parseLong(xid);
355         long byt = Long.parseLong(cnt);
356         long err = Long.parseLong(eid);
357         long crc = Long.parseLong(sum, 16);
358         
359         synchronized (_transfers) {
360             for (int i = 0; i < _transfers.size(); i++) {
361                 AsyncTransfer tmp = (AsyncTransfer) _transfers.get(i);
362                 if (tmp.getID() == tid) {
363                     // update transfer status
364
tmp.updateStats(sta, byt, crc, err, adr);
365                     break;
366                 }
367             }
368         }
369     }
370     
371     public void addTransfer(AsyncTransfer transfer) {
372         synchronized (_transfers) {
373             _transfers.add(transfer);
374         }
375     }
376     
377     public void removeTransfer(AsyncTransfer transfer) {
378         synchronized (_transfers) {
379             switch (transfer.getDirection()) {
380                 case Transfer.TRANSFER_RECEIVING_UPLOAD :
381                     _receivedBytes += transfer.getTransfered();
382                     break;
383                 case Transfer.TRANSFER_SENDING_DOWNLOAD :
384                     _sentBytes += transfer.getTransfered();
385                     break;
386                 default :
387                     throw new IllegalArgumentException JavaDoc();
388             }
389             if (!_transfers.remove(transfer))
390                 throw new IllegalStateException JavaDoc();
391         }
392     }
393     
394     //*********************************
395
// Command Response Processing Methods
396
//*********************************
397

398     private void processPing(AsyncCommand cmd, String JavaDoc res) {
399         cmd.setStatus(0);
400     }
401     
402     private void processCRC32(AsyncCommand cmd, String JavaDoc res) {
403         if (res.startsWith("CRCFAIL ")) {
404             cmd.setStatus(1);
405             return;
406         }
407         cmd._data.put("crc32", res.substring(6));
408         cmd.setStatus(0);
409     }
410     
411     private void processDump(AsyncCommand cmd, String JavaDoc res) {
412         if (res.startsWith("DUMPFAIL ")) {
413             cmd.setStatus(1);
414             return;
415         }
416         if (!cmd._data.containsKey("data")) {
417             cmd._data.put("data", new StringBuffer JavaDoc(65536));
418         }
419         if (res.equals("DUMPEND")) {
420             cmd.setStatus(0);
421         }
422         StringBuffer JavaDoc buf = (StringBuffer JavaDoc)cmd._data.get("data");
423         buf.append(res);
424     }
425     
426     private void processDelete(AsyncCommand cmd, String JavaDoc res) {
427         if (res.startsWith("DELFAIL "))
428             cmd.setStatus(1);
429         else
430             cmd.setStatus(0);
431     }
432     
433     private void processRename(AsyncCommand cmd, String JavaDoc res) {
434         if (res.startsWith("RENFAIL "))
435             cmd.setStatus(1);
436         else
437             cmd.setStatus(0);
438     }
439     
440     private void processSend(AsyncCommand cmd, String JavaDoc res) {
441         if (res.startsWith("SENDFAIL ")) {
442             cmd.setStatus(-1);
443         } else {
444             // parse response
445
cmd.setStatus(0);
446         }
447     }
448     
449     private void processRecv(AsyncCommand cmd, String JavaDoc res) {
450         if (res.startsWith("RECVFAIL ")) {
451             cmd.setStatus(-1);
452         } else {
453             // parse response
454
cmd.setStatus(0);
455         }
456     }
457     
458     private void processConnect(AsyncCommand cmd, String JavaDoc res) {
459         if (res.startsWith("CONNFAIL ")) {
460             cmd.setStatus(-1);
461         } else {
462             // parse response
463
String JavaDoc[] items = res.split(" ");
464             cmd.getData().put("conn", items[1]);
465             if (items.length > 2)
466                 cmd.getData().put("addr", items[2]);
467             cmd.setStatus(0);
468         }
469     }
470     
471     private void processDisk(AsyncCommand cmd, String JavaDoc res) {
472         if (res.startsWith("DISKFAIL ")) {
473             cmd.setStatus(-1);
474         } else {
475             // parse response
476
String JavaDoc[] items = res.split(" ");
477             cmd.getData().put("total", items[1]);
478             cmd.getData().put("free", items[2]);
479         }
480     }
481     
482     private void processList(AsyncCommand cmd, String JavaDoc res) {
483         if (!cmd._data.containsKey("data")) {
484             cmd._data.put("data", new StringBuffer JavaDoc(65536));
485         }
486         if (res.equals("DUMPEND")) {
487             cmd.setStatus(0);
488         }
489         StringBuffer JavaDoc buf = (StringBuffer JavaDoc)cmd._data.get("data");
490         if (res.startsWith("LISTFAIL")) {
491             cmd.setStatus(-1);
492         } else {
493             if (res.equals("LISTEND")) {
494                 try {
495                     LinkedRemoteFile root =
496                     MLSTSerialize.unserialize(
497                     _cman.getConfig(),
498                     new StringReader JavaDoc(buf.toString()),
499                     _cman.getSlaveManager().getSlaveList());
500                     _root = root;
501                 } catch (Exception JavaDoc e) {
502                     logger.info("LIST Exception from " + getName(), e);
503                 }
504                 return;
505             }
506             if (res.equals("LISTBEGIN")) {
507                 return;
508             }
509             if (!res.startsWith("/")
510             && !res.equals("")
511             && res.indexOf("type=dir;") == -1)
512                 buf.append("x.slaves=" + _name + ";");
513             buf.append(res);
514             buf.append((String JavaDoc) "\n");
515         }
516     }
517     
518     
519     //*********************************
520
// Synchronous action methods
521
//*********************************
522

523     public AsyncCommand sendCommand(String JavaDoc cmd, String JavaDoc args) {
524         while (availcmd.size() == 0) {
525             try { sleep(100); } catch (Exception JavaDoc e) {}
526         }
527         String JavaDoc chan = (String JavaDoc)availcmd.pop();
528         AsyncCommand tmp = new AsyncCommand(chan, cmd, args, this);
529         commands.put(chan, tmp);
530         String JavaDoc msg = chan + " " + cmd + " " + args;
531         sendLine(msg);
532         return tmp;
533     }
534     
535     public void releaseChan(String JavaDoc chan)
536     {
537         availcmd.push(chan);
538         commands.put(chan, null);
539     }
540     
541     public void waitForCommand(AsyncCommand cmd) {
542         cmd.waitForComplete();
543     }
544     
545     public void ping() {
546         logger.debug("Trying PING");
547         AsyncCommand cmd = sendCommand("ping", "");
548         waitForCommand(cmd);
549         return;
550     }
551     
552     public long checkSum(String JavaDoc path) throws IOException JavaDoc {
553         AsyncCommand cmd = sendCommand("csum", "\"" + path + "\"");
554         waitForCommand(cmd);
555         return Integer.parseInt((String JavaDoc) cmd._data.get("crc32"));
556     }
557     
558     public String JavaDoc dumpfile(String JavaDoc path) throws IOException JavaDoc {
559         AsyncCommand cmd = sendCommand("dump", "\"" + path + "\"");
560         waitForCommand(cmd);
561         return (String JavaDoc) cmd._data.get("data");
562     }
563     
564     public void delete(String JavaDoc path) throws IOException JavaDoc {
565         AsyncCommand cmd = sendCommand("dele", "\"" + path + "\"");
566         waitForCommand(cmd);
567     }
568     
569     public void rename(String JavaDoc from, String JavaDoc toDirPath, String JavaDoc toName)
570     throws IOException JavaDoc {
571         AsyncCommand cmd = sendCommand("renm", "\"" + from + "\" "+ toDirPath + "/" + toName + "\"");
572         waitForCommand(cmd);
573     }
574     
575     public SFVFile getSFVFile(String JavaDoc path) throws IOException JavaDoc {
576         String JavaDoc sfv = dumpfile(path);
577         return new SFVFile(new BufferedReader JavaDoc(new StringReader JavaDoc(sfv)));
578     }
579     
580     public Transfer connect(InetSocketAddress JavaDoc addr, boolean encrypted) {
581         AsyncCommand cmd = sendCommand("conn", addr.getAddress().getHostAddress());
582         waitForCommand(cmd);
583         Transfer tmp = (Transfer) new AsyncTransfer(this, cmd);
584         return tmp;
585     }
586     
587     public Transfer listen(boolean encrypted)
588     throws RemoteException JavaDoc, IOException JavaDoc {
589         AsyncCommand cmd = sendCommand("conn", "");
590         waitForCommand(cmd);
591         Transfer tmp = (Transfer) new AsyncTransfer(this, cmd);
592         return tmp;
593     }
594     
595     public SlaveStatus getSlaveStatus() {
596         int throughputUp = 0, throughputDown = 0;
597         int transfersUp = 0, transfersDown = 0;
598         long bytesReceived, bytesSent;
599         synchronized (_transfers) {
600             bytesReceived = _receivedBytes;
601             bytesSent = _sentBytes;
602             for (Iterator JavaDoc i = _transfers.iterator(); i.hasNext();) {
603                 AsyncTransfer transfer = (AsyncTransfer) i.next();
604                 switch (transfer.getDirection()) {
605                     case Transfer.TRANSFER_RECEIVING_UPLOAD :
606                         throughputUp += transfer.getXferSpeed();
607                         transfersUp += 1;
608                         bytesReceived += transfer.getTransfered();
609                         break;
610                     case Transfer.TRANSFER_SENDING_DOWNLOAD :
611                         throughputDown += transfer.getXferSpeed();
612                         transfersDown += 1;
613                         bytesSent += transfer.getTransfered();
614                         break;
615                     default :
616                         throw new FatalException("unrecognized direction");
617                 }
618             }
619         }
620         AsyncCommand cmd = sendCommand("disk", "");
621         waitForCommand(cmd);
622         Hashtable JavaDoc result = cmd._data;
623         long dtotal, dfree;
624         if (result != null) {
625             disktotal = Long.parseLong(result.get("total").toString());
626             diskfree = Long.parseLong(result.get("free").toString());
627         }
628         try {
629             return new SlaveStatus(
630             diskfree,
631             //_roots.getTotalDiskSpaceAvailable(),
632
disktotal, //_roots.getTotalDiskSpaceCapacity(),
633
bytesSent,
634             bytesReceived,
635             throughputUp,
636             transfersUp,
637             throughputDown,
638             transfersDown);
639         } catch (Exception JavaDoc ex) {
640             ex.printStackTrace();
641             throw new RuntimeException JavaDoc(ex.toString());
642         }
643     }
644     
645     public LinkedRemoteFile getSlaveRoot() throws IOException JavaDoc {
646         AsyncCommand cmd = sendCommand("list", "");
647         waitForCommand(cmd);
648         return _root;
649     }
650     
651     //*********************************
652
// Leftovers from original SlaveImpl
653
//*********************************
654

655     public void unreferenced() {
656         logger.info("unreferenced");
657         System.exit(0);
658     }
659     
660     public InetAddress JavaDoc getPeerAddress() {
661         try {
662             return InetAddress.getLocalHost();
663         } catch (Exception JavaDoc e) {
664             throw new RuntimeException JavaDoc();
665         }
666     }
667     
668 }
669
Popular Tags