KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sf > drftpd > mirroring > JobManager


1 /*
2  * This file is part of DrFTPD, Distributed FTP Daemon.
3  *
4  * DrFTPD is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * DrFTPD is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with DrFTPD; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  */

18 package net.sf.drftpd.mirroring;
19 import java.io.FileInputStream JavaDoc;
20 import java.io.IOException JavaDoc;
21 import java.rmi.RemoteException JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.Collection JavaDoc;
24 import java.util.Collections JavaDoc;
25 import java.util.HashSet JavaDoc;
26 import java.util.Iterator JavaDoc;
27 import java.util.List JavaDoc;
28 import java.util.Properties JavaDoc;
29 import java.util.Set JavaDoc;
30
31 import net.sf.drftpd.FatalException;
32 import net.sf.drftpd.NoAvailableSlaveException;
33 import net.sf.drftpd.FileExistsException;
34 import net.sf.drftpd.SlaveUnavailableException;
35 import net.sf.drftpd.master.ConnectionManager;
36 import net.sf.drftpd.master.RemoteSlave;
37 import net.sf.drftpd.master.config.FtpConfig;
38 import org.apache.log4j.Logger;
39 /**
40  * @author zubov
41  * @version $Id: JobManager.java,v 1.47.2.1 2004/06/11 00:59:58 mog Exp $
42  */

43 public class JobManager implements Runnable JavaDoc {
44     private static final Logger logger = Logger.getLogger(JobManager.class);
45     private ConnectionManager _cm;
46     private boolean _isStopped = false;
47     private ArrayList JavaDoc _jobList = new ArrayList JavaDoc();
48     private boolean _useCRC;
49     private Thread JavaDoc thread;
50     private int _sleepSeconds;
51     /**
52      * Keeps track of all jobs and controls them
53      */

54     public JobManager(ConnectionManager cm) throws IOException JavaDoc {
55         _cm = cm;
56         reload();
57     }
58     protected JobManager(ConnectionManager cm, Properties JavaDoc p) {
59         _cm = cm;
60         reload(p);
61     }
62     public void startJobs() {
63         if (thread != null) {
64             stopJobs();
65             thread.interrupt();
66             while (thread.isAlive()) {
67                 logger.debug("thread is still alive");
68                 Thread.yield();
69             }
70         }
71         _isStopped = false;
72         thread = new Thread JavaDoc(this, "JobTransferStarter");
73         thread.start();
74     }
75
76     public void stopJobs() {
77         _isStopped = true;
78     }
79
80     public boolean isStopped() {
81         return _isStopped;
82     }
83
84     public synchronized void addJob(Job job) {
85         Collection JavaDoc slaves = job.getFile().getSlaves();
86         for (Iterator JavaDoc iter = slaves.iterator();
87             iter.hasNext();
88             ) {
89             RemoteSlave slave = (RemoteSlave) iter.next();
90             if (job.getDestinationSlaves().contains(slave)) {
91                 job.sentToSlave(slave);
92             }
93         }
94         if (job.isDone())
95             return;
96         _jobList.add(job);
97         Collections.sort(_jobList, new JobComparator());
98     }
99     /**
100      * Gets all jobs.
101      */

102     public synchronized List JavaDoc getAllJobs() {
103         return Collections.unmodifiableList(_jobList);
104     }
105     
106     public synchronized Job getNextJob(Set JavaDoc busySlaves, Set JavaDoc skipJobs) {
107         for (Iterator JavaDoc iter = _jobList.iterator(); iter.hasNext();) {
108             Job tempJob = (Job) iter.next();
109             if (tempJob.getFile().isDeleted() || tempJob.isDone()) {
110                 iter.remove();
111                 continue;
112             }
113             if (skipJobs.contains(tempJob))
114                 continue;
115             Collection JavaDoc availableSlaves = null;
116             try {
117                 availableSlaves = tempJob.getFile().getAvailableSlaves();
118             } catch (NoAvailableSlaveException e) {
119                 continue; // can't transfer what isn't online
120
}
121             if (!busySlaves.containsAll(availableSlaves)) {
122                 return tempJob;
123             }
124         }
125         return null;
126     }
127
128     /**
129      * Returns true if the file was sent okay
130      */

131     public boolean processJob() {
132         Job job = null;
133         RemoteSlave sourceSlave = null;
134         RemoteSlave destSlave = null;
135         long time;
136         long difference;
137         synchronized (this) {
138             Collection JavaDoc availableSlaves;
139             try {
140                 availableSlaves = _cm.getSlaveManager().getAvailableSlaves();
141             } catch (NoAvailableSlaveException e1) {
142                 return false;
143                 // can't transfer with no slaves
144
}
145             Set JavaDoc busySlavesDown = new HashSet JavaDoc();
146             Set JavaDoc skipJobs = new HashSet JavaDoc();
147             while (!busySlavesDown.containsAll(availableSlaves)) {
148                 job = getNextJob(busySlavesDown, skipJobs);
149                 if (job == null) {
150                     return false;
151                 }
152                 logger.debug("looking up slave for job " + job);
153                 try {
154                     sourceSlave =
155                         _cm
156                             .getSlaveManager()
157                             .getSlaveSelectionManager()
158                             .getASlaveForJobDownload(
159                             job);
160                 } catch (NoAvailableSlaveException e) {
161                     try {
162                         busySlavesDown.addAll(job.getFile().getAvailableSlaves());
163                     } catch (NoAvailableSlaveException e2) {
164                     }
165                     continue;
166                 }
167                 if (sourceSlave == null) {
168                     logger.debug(
169                         "JobManager was unable to find a suitable job for transfer");
170                     return false;
171                 }
172                 try {
173                     destSlave =
174                         _cm
175                             .getSlaveManager()
176                             .getSlaveSelectionManager()
177                             .getASlaveForJobUpload(
178                             job);
179                     break; // we have a source slave and a destination slave, transfer!
180
} catch (NoAvailableSlaveException e) {
181                     // job was ready to be sent, but it had no slave that was ready to accept it
182
skipJobs.add(job);
183                     continue;
184                 }
185             }
186             if (destSlave == null) {
187                 logger.debug("destSlave is null, all destination slaves are busy" + job);
188                 return false;
189             }
190             logger.debug(
191                 "ready to transfer "
192                     + job
193                     + " from "
194                     + sourceSlave.getName()
195                     + " to "
196                     + destSlave.getName());
197             time = System.currentTimeMillis();
198             difference = 0;
199             removeJob(job);
200         }
201         // job is not deleted and is out of the jobList, we are ready to
202
// process
203
logger.info(
204             "Sending "
205                 + job.getFile().getName()
206                 + " from "
207                 + sourceSlave.getName()
208                 + " to "
209                 + destSlave.getName());
210         SlaveTransfer slaveTransfer =
211             new SlaveTransfer(job.getFile(), sourceSlave, destSlave);
212         try {
213             if (!slaveTransfer.transfer(useCRC())) { // crc failed
214
try {
215                     destSlave.getSlave().delete(job.getFile().getPath());
216                 } catch (IOException JavaDoc e) {
217                     //couldn't delete it, just carry on
218
}
219                 logger.debug(
220                     "CRC did not match for "
221                         + job.getFile()
222                         + " when sending from "
223                         + sourceSlave.getName()
224                         + " to "
225                         + destSlave.getName());
226                 addJob(job);
227                 return false;
228             }
229         } catch (IOException JavaDoc e) {
230             logger.debug(
231                 "Caught IOException in sending "
232                     + job.getFile().getName()
233                     + " from "
234                     + sourceSlave.getName()
235                     + " to "
236                     + destSlave.getName(),
237                 e);
238             if (!(e instanceof FileExistsException)) {
239                 try {
240                     destSlave.getSlave().delete(job.getFile().getPath());
241                 } catch (SlaveUnavailableException e3) {
242                     //couldn't delete it, just carry on
243
} catch (IOException JavaDoc e1) {
244                     //couldn't delete it, just carry on
245
}
246                 addJob(job);
247                 return false;
248             }
249             logger.debug(
250                 "File "
251                     + job.getFile()
252                     + " was already on the destination slave");
253             try {
254                 if (destSlave.getSlave().checkSum(job.getFile().getPath())
255                     == job.getFile().getCheckSum()) {
256                     logger.debug("Accepting file because the crc's match");
257                 } else {
258                     try {
259                         destSlave.getSlave().delete(job.getFile().getPath());
260                     } catch (SlaveUnavailableException e3) {
261                         //couldn't delete it, just carry on
262
} catch (IOException JavaDoc e1) {
263                         //couldn't delete it, just carry on
264
}
265                     addJob(job);
266                     return false;
267                 }
268             } catch (RemoteException JavaDoc e1) {
269                 destSlave.handleRemoteException(e1);
270                 addJob(job);
271                 return false;
272             } catch (NoAvailableSlaveException e1) {
273                 addJob(job);
274                 return false;
275             } catch (SlaveUnavailableException e2) {
276                 addJob(job);
277                 return false;
278             } catch (IOException JavaDoc e1) {
279                 addJob(job);
280                 return false;
281             }
282         } catch (Exception JavaDoc e) {
283             logger.debug(
284                 "Error Sending "
285                     + job.getFile().getName()
286                     + " from "
287                     + sourceSlave.getName()
288                     + " to "
289                     + destSlave.getName(),
290                 e);
291             addJob(job);
292             return false;
293         }
294         difference = System.currentTimeMillis() - time;
295         logger.debug(
296             "Sent file "
297                 + job.getFile().getName()
298                 + " to "
299                 + destSlave.getName()
300                 + " from "
301                 + sourceSlave.getName());
302         job.addTimeSpent(difference);
303         job.sentToSlave(destSlave);
304         if (job.isDone()) {
305             logger.debug("Job is finished, removing job " + job.getFile());
306         } else {
307             addJob(job);
308         }
309         return true;
310     }
311     public void reload() {
312         Properties JavaDoc p = new Properties JavaDoc();
313         try {
314             p.load(new FileInputStream JavaDoc("conf/jobmanager.conf"));
315         } catch (IOException JavaDoc e) {
316             throw new FatalException(e);
317         }
318         reload(p);
319     }
320     protected void reload(Properties JavaDoc p) {
321         _useCRC = p.getProperty("useCRC", "true").equals("true");
322         _sleepSeconds =
323             1000 * Integer.parseInt(FtpConfig.getProperty(p, "sleepSeconds"));
324     }
325     public synchronized void removeJob(Job job) {
326         _jobList.remove(job);
327         Collections.sort(_jobList, new JobComparator());
328     }
329
330     private boolean useCRC() {
331         return _useCRC;
332     }
333     
334     public void stopJob(Job job) {
335         removeJob(job);
336         job.setDone();
337     }
338
339     public void run() {
340         while (true) {
341             if (isStopped()) {
342                 logger.debug("Stopping JobTransferStarter thread");
343                 return;
344             }
345             new JobTransferThread(this).start();
346             try {
347                 Thread.sleep(_sleepSeconds);
348             } catch (InterruptedException JavaDoc e) {
349             }
350         }
351     }
352
353 }
354
Popular Tags