1 18 package net.sf.drftpd.mirroring; 19 import java.io.FileInputStream ; 20 import java.io.IOException ; 21 import java.rmi.RemoteException ; 22 import java.util.ArrayList ; 23 import java.util.Collection ; 24 import java.util.Collections ; 25 import java.util.HashSet ; 26 import java.util.Iterator ; 27 import java.util.List ; 28 import java.util.Properties ; 29 import java.util.Set ; 30 31 import net.sf.drftpd.FatalException; 32 import net.sf.drftpd.NoAvailableSlaveException; 33 import net.sf.drftpd.FileExistsException; 34 import net.sf.drftpd.SlaveUnavailableException; 35 import net.sf.drftpd.master.ConnectionManager; 36 import net.sf.drftpd.master.RemoteSlave; 37 import net.sf.drftpd.master.config.FtpConfig; 38 import org.apache.log4j.Logger; 39 43 public class JobManager implements Runnable { 44 private static final Logger logger = Logger.getLogger(JobManager.class); 45 private ConnectionManager _cm; 46 private boolean _isStopped = false; 47 private ArrayList _jobList = new ArrayList (); 48 private boolean _useCRC; 49 private Thread thread; 50 private int _sleepSeconds; 51 54 public JobManager(ConnectionManager cm) throws IOException { 55 _cm = cm; 56 reload(); 57 } 58 protected JobManager(ConnectionManager cm, Properties p) { 59 _cm = cm; 60 reload(p); 61 } 62 public void startJobs() { 63 if (thread != null) { 64 stopJobs(); 65 thread.interrupt(); 66 while (thread.isAlive()) { 67 logger.debug("thread is still alive"); 68 Thread.yield(); 69 } 70 } 71 _isStopped = false; 72 thread = new Thread (this, "JobTransferStarter"); 73 thread.start(); 74 } 75 76 public void stopJobs() { 77 _isStopped = true; 78 } 79 80 public boolean isStopped() { 81 return _isStopped; 82 } 83 84 public synchronized void addJob(Job job) { 85 Collection slaves = job.getFile().getSlaves(); 86 for (Iterator iter = slaves.iterator(); 87 iter.hasNext(); 88 ) { 89 RemoteSlave slave = (RemoteSlave) iter.next(); 90 if (job.getDestinationSlaves().contains(slave)) { 91 job.sentToSlave(slave); 92 } 93 } 94 if (job.isDone()) 95 return; 96 _jobList.add(job); 97 Collections.sort(_jobList, new JobComparator()); 98 } 99 102 public synchronized List getAllJobs() { 103 return Collections.unmodifiableList(_jobList); 104 } 105 106 public synchronized Job getNextJob(Set busySlaves, Set skipJobs) { 107 for (Iterator iter = _jobList.iterator(); iter.hasNext();) { 108 Job tempJob = (Job) iter.next(); 109 if (tempJob.getFile().isDeleted() || tempJob.isDone()) { 110 iter.remove(); 111 continue; 112 } 113 if (skipJobs.contains(tempJob)) 114 continue; 115 Collection availableSlaves = null; 116 try { 117 availableSlaves = tempJob.getFile().getAvailableSlaves(); 118 } catch (NoAvailableSlaveException e) { 119 continue; } 121 if (!busySlaves.containsAll(availableSlaves)) { 122 return tempJob; 123 } 124 } 125 return null; 126 } 127 128 131 public boolean processJob() { 132 Job job = null; 133 RemoteSlave sourceSlave = null; 134 RemoteSlave destSlave = null; 135 long time; 136 long difference; 137 synchronized (this) { 138 Collection availableSlaves; 139 try { 140 availableSlaves = _cm.getSlaveManager().getAvailableSlaves(); 141 } catch (NoAvailableSlaveException e1) { 142 return false; 143 } 145 Set busySlavesDown = new HashSet (); 146 Set skipJobs = new HashSet (); 147 while (!busySlavesDown.containsAll(availableSlaves)) { 148 job = getNextJob(busySlavesDown, skipJobs); 149 if (job == null) { 150 return false; 151 } 152 logger.debug("looking up slave for job " + job); 153 try { 154 sourceSlave = 155 _cm 156 .getSlaveManager() 157 .getSlaveSelectionManager() 158 .getASlaveForJobDownload( 159 job); 160 } catch (NoAvailableSlaveException e) { 161 try { 162 busySlavesDown.addAll(job.getFile().getAvailableSlaves()); 163 } catch (NoAvailableSlaveException e2) { 164 } 165 continue; 166 } 167 if (sourceSlave == null) { 168 logger.debug( 169 "JobManager was unable to find a suitable job for transfer"); 170 return false; 171 } 172 try { 173 destSlave = 174 _cm 175 .getSlaveManager() 176 .getSlaveSelectionManager() 177 .getASlaveForJobUpload( 178 job); 179 break; } catch (NoAvailableSlaveException e) { 181 skipJobs.add(job); 183 continue; 184 } 185 } 186 if (destSlave == null) { 187 logger.debug("destSlave is null, all destination slaves are busy" + job); 188 return false; 189 } 190 logger.debug( 191 "ready to transfer " 192 + job 193 + " from " 194 + sourceSlave.getName() 195 + " to " 196 + destSlave.getName()); 197 time = System.currentTimeMillis(); 198 difference = 0; 199 removeJob(job); 200 } 201 logger.info( 204 "Sending " 205 + job.getFile().getName() 206 + " from " 207 + sourceSlave.getName() 208 + " to " 209 + destSlave.getName()); 210 SlaveTransfer slaveTransfer = 211 new SlaveTransfer(job.getFile(), sourceSlave, destSlave); 212 try { 213 if (!slaveTransfer.transfer(useCRC())) { try { 215 destSlave.getSlave().delete(job.getFile().getPath()); 216 } catch (IOException e) { 217 } 219 logger.debug( 220 "CRC did not match for " 221 + job.getFile() 222 + " when sending from " 223 + sourceSlave.getName() 224 + " to " 225 + destSlave.getName()); 226 addJob(job); 227 return false; 228 } 229 } catch (IOException e) { 230 logger.debug( 231 "Caught IOException in sending " 232 + job.getFile().getName() 233 + " from " 234 + sourceSlave.getName() 235 + " to " 236 + destSlave.getName(), 237 e); 238 if (!(e instanceof FileExistsException)) { 239 try { 240 destSlave.getSlave().delete(job.getFile().getPath()); 241 } catch (SlaveUnavailableException e3) { 242 } catch (IOException e1) { 244 } 246 addJob(job); 247 return false; 248 } 249 logger.debug( 250 "File " 251 + job.getFile() 252 + " was already on the destination slave"); 253 try { 254 if (destSlave.getSlave().checkSum(job.getFile().getPath()) 255 == job.getFile().getCheckSum()) { 256 logger.debug("Accepting file because the crc's match"); 257 } else { 258 try { 259 destSlave.getSlave().delete(job.getFile().getPath()); 260 } catch (SlaveUnavailableException e3) { 261 } catch (IOException e1) { 263 } 265 addJob(job); 266 return false; 267 } 268 } catch (RemoteException e1) { 269 destSlave.handleRemoteException(e1); 270 addJob(job); 271 return false; 272 } catch (NoAvailableSlaveException e1) { 273 addJob(job); 274 return false; 275 } catch (SlaveUnavailableException e2) { 276 addJob(job); 277 return false; 278 } catch (IOException e1) { 279 addJob(job); 280 return false; 281 } 282 } catch (Exception e) { 283 logger.debug( 284 "Error Sending " 285 + job.getFile().getName() 286 + " from " 287 + sourceSlave.getName() 288 + " to " 289 + destSlave.getName(), 290 e); 291 addJob(job); 292 return false; 293 } 294 difference = System.currentTimeMillis() - time; 295 logger.debug( 296 "Sent file " 297 + job.getFile().getName() 298 + " to " 299 + destSlave.getName() 300 + " from " 301 + sourceSlave.getName()); 302 job.addTimeSpent(difference); 303 job.sentToSlave(destSlave); 304 if (job.isDone()) { 305 logger.debug("Job is finished, removing job " + job.getFile()); 306 } else { 307 addJob(job); 308 } 309 return true; 310 } 311 public void reload() { 312 Properties p = new Properties (); 313 try { 314 p.load(new FileInputStream ("conf/jobmanager.conf")); 315 } catch (IOException e) { 316 throw new FatalException(e); 317 } 318 reload(p); 319 } 320 protected void reload(Properties p) { 321 _useCRC = p.getProperty("useCRC", "true").equals("true"); 322 _sleepSeconds = 323 1000 * Integer.parseInt(FtpConfig.getProperty(p, "sleepSeconds")); 324 } 325 public synchronized void removeJob(Job job) { 326 _jobList.remove(job); 327 Collections.sort(_jobList, new JobComparator()); 328 } 329 330 private boolean useCRC() { 331 return _useCRC; 332 } 333 334 public void stopJob(Job job) { 335 removeJob(job); 336 job.setDone(); 337 } 338 339 public void run() { 340 while (true) { 341 if (isStopped()) { 342 logger.debug("Stopping JobTransferStarter thread"); 343 return; 344 } 345 new JobTransferThread(this).start(); 346 try { 347 Thread.sleep(_sleepSeconds); 348 } catch (InterruptedException e) { 349 } 350 } 351 } 352 353 } 354 | Popular Tags |