1 22 package org.jboss.ha.framework.server; 23 24 import org.jboss.ha.framework.interfaces.HAPartition; 25 import org.jboss.ha.framework.interfaces.ClusterNode; 26 import org.jboss.system.server.ServerConfigLocator; 27 import org.jboss.logging.Logger; 28 import org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListener; 29 30 import java.util.*; 31 import java.io.*; 32 33 40 public class ClusterFileTransfer implements AsynchHAMembershipListener { 41 42 private static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024; 46 47 private Map mPushsInProcess = Collections.synchronizedMap(new HashMap()); 49 50 private Map mPullsInProcess = Collections.synchronizedMap(new HashMap()); 52 53 private HAPartition mPartition; 54 55 private static final File TEMP_DIRECTORY = ServerConfigLocator.locate().getServerTempDir(); 56 57 private Map mParentFolders = null; 61 62 private static final String SERVICE_NAME = ClusterFileTransfer.class.getName() + "Service"; 63 64 private static final Logger log = Logger.getLogger(ClusterFileTransfer.class.getName()); 65 66 73 public ClusterFileTransfer(HAPartition partition, Map destinationDirectoryMap) 74 { 75 this.mPartition = partition; 76 this.mPartition.registerRPCHandler(SERVICE_NAME, this); 77 this.mPartition.registerMembershipListener(this); 78 mParentFolders = destinationDirectoryMap; 79 } 80 81 88 public void pull(File file, String parentName) throws ClusterFileTransferException 89 { 90 String myNodeName = this.mPartition.getNodeName(); 91 ClusterNode myNodeAddress = this.mPartition.getClusterNode(); 92 FileOutputStream output = null; 93 try 94 { 95 log.info("Start pull of file " + file.getName() + " from cluster."); 96 ArrayList response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME, 97 "remotePullOpenFile", 98 new Object []{file, myNodeName, myNodeAddress, parentName}, new Class []{java.io.File .class, java.lang.String .class,ClusterNode.class, java.lang.String .class}, 99 true); 100 101 if (response == null || response.size() < 1) 102 { 103 throw new ClusterFileTransferException("Did not receive response from remote machine trying to open file '" + file + "'. Check remote machine error log."); 104 } 105 106 FileContentChunk fileChunk = (FileContentChunk) response.get(0); 107 if(null == fileChunk) 108 { 109 throw new ClusterFileTransferException("An error occured on remote machine trying to open file '" + file + "'. Check remote machine error log."); 110 } 111 112 File tempFile = new File(ClusterFileTransfer.getServerTempDir(), file.getName()); 113 output = new FileOutputStream(tempFile); 114 115 long lastModification = fileChunk.lastModified(); 117 while (fileChunk.mByteCount > 0) 118 { 119 output.write(fileChunk.mChunk, 0, fileChunk.mByteCount); 120 response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME, 121 "remotePullReadFile", 122 new Object []{file, myNodeName}, new Class []{java.io.File .class, java.lang.String .class}, 123 true); 124 if (response.size() < 1) 125 { 126 if(! tempFile.delete()) 127 throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Is remote still running? Also, we couldn't delete temp file "+ tempFile.getName()); 128 throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Is remote still running?"); 129 } 130 fileChunk = (FileContentChunk) response.get(0); 131 if (null == fileChunk) 132 { 133 if( !tempFile.delete()) 134 throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Check remote machine error log. Also, we couldn't delete temp file "+ tempFile.getName()); 135 throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Check remote machine error log."); 136 } 137 } 138 output.close(); 139 output = null; 140 File target = new File(getParentFile(parentName), file.getName()); 141 if (target.exists()) { 142 if(!target.delete()) 143 throw new ClusterFileTransferException("The destination file "+ target + " couldn't be deleted, the updated application will not be copied to this node"); 144 145 } 146 tempFile.setLastModified(lastModification); 147 if (!localMove(tempFile,target)) 148 { 149 throw new ClusterFileTransferException("Could not move " + tempFile + " to " + target); 150 } 151 log.info("Finished cluster pull of file " + file.getName() + " to "+ target.getName()); 152 } 153 catch(IOException e) 154 { 155 throw new ClusterFileTransferException(e); 156 } 157 catch(ClusterFileTransferException e) 158 { 159 throw e; 160 } 161 catch(Exception e) 162 { 163 throw new ClusterFileTransferException(e); 164 } 165 finally { 166 if( output != null) { 167 try { 168 output.close(); 169 } 170 catch(IOException e) {logException(e);} } 172 } 173 } 174 175 184 public FileContentChunk remotePullOpenFile(File file, String originNodeName, ClusterNode originNode, String parentName) 185 { 186 try 187 { 188 File target = new File(getParentFile(parentName), file.getName()); 189 FileContentChunk fileChunk = new FileContentChunk(target, originNodeName,originNode); 190 FilePullOperation filePullOperation = new FilePullOperation(fileChunk); 191 this.mPullsInProcess.put(CompositeKey(originNodeName, file.getName()), filePullOperation); 193 filePullOperation.openInputFile(); 194 fileChunk.readNext(filePullOperation.getInputStream()); 195 return fileChunk; 196 } catch (IOException e) 197 { 198 logException(e); 199 } catch (Exception e) 200 { 201 logException(e); 202 } 203 return null; 204 } 205 206 214 public FileContentChunk remotePullReadFile(File file, String originNodeName) 215 { 216 try 217 { 218 FilePullOperation filePullOperation = (FilePullOperation) this.mPullsInProcess.get(CompositeKey(originNodeName, file.getName())); 219 filePullOperation.getFileChunk().readNext(filePullOperation.getInputStream()); 220 if (filePullOperation.getFileChunk().mByteCount < 1) 221 { 222 filePullOperation.getInputStream().close(); 224 this.mPullsInProcess.remove(CompositeKey(originNodeName, file.getName())); 225 } 226 return filePullOperation.getFileChunk(); 227 } catch (IOException e) 228 { 229 logException(e); 230 } 231 return null; 232 } 233 234 241 public void push(File file, String parentName, boolean leaveInTempFolder) throws ClusterFileTransferException 242 { 243 File target = new File(getParentFile(parentName), file.getName()); 244 245 log.info("Start push of file " + file.getName() + " to cluster."); 246 if (target.isDirectory()) 248 { 249 logMessage("You cannot send the contents of directories, consider archiving folder containing" + target.getName() + " instead."); 251 return; 252 } 253 ClusterNode myNodeAddress = this.mPartition.getClusterNode(); 254 FileContentChunk fileChunk = new FileContentChunk(target, this.mPartition.getNodeName(), myNodeAddress); 255 try 256 { 257 InputStream input = fileChunk.openInputFile(); 258 while (fileChunk.readNext(input) >= 0) 259 { 260 mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushWriteFile", new Object []{fileChunk, parentName}, new Class []{fileChunk.getClass(), java.lang.String .class}, true); 261 } 262 mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushCloseFile", new Object []{fileChunk, new Boolean (leaveInTempFolder), parentName}, new Class []{fileChunk.getClass(), Boolean .class, java.lang.String .class}, true); 264 input.close(); 265 log.info("Finished push of file " + file.getName() + " to cluster."); 266 } 267 catch(FileNotFoundException e) 268 { 269 throw new ClusterFileTransferException(e); 270 } 271 catch(IOException e) 272 { 273 throw new ClusterFileTransferException(e); 274 } 275 catch(Exception e) 276 { 277 throw new ClusterFileTransferException(e); 278 } 279 } 280 281 282 287 public void remotePushWriteFile(FileContentChunk fileChunk, String parentName) 288 { 289 try 290 { 291 String key = CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName()); 292 FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.get(key); 293 294 if (filePushOperation == null) 296 { 297 if (fileChunk.mChunkNumber != FileContentChunk.FIRST_CHUNK) 298 { 299 logMessage("Ignoring file transfer of '" + fileChunk.getDestinationFile().getName() + "' from " + fileChunk.getOriginatingNodeName() + ", we missed the start of it."); 301 return; 302 } 303 filePushOperation = new FilePushOperation(fileChunk.getOriginatingNodeName(), fileChunk.getOriginatingNode()); 304 File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName()); 305 filePushOperation.openOutputFile(tempFile); 306 mPushsInProcess.put(key, filePushOperation); 307 } 308 filePushOperation.getOutputStream().write(fileChunk.mChunk, 0, fileChunk.mByteCount); 309 } catch (FileNotFoundException e) 310 { 311 logException(e); 312 } catch (IOException e) 313 { 314 logException(e); 315 } 316 } 317 318 324 public void remotePushCloseFile(FileContentChunk fileChunk, Boolean leaveInTempFolder, String parentName) 325 { 326 try 327 { 328 FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.remove(CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName())); 329 330 if ((filePushOperation != null) && (filePushOperation.getOutputStream() != null)) 331 { 332 filePushOperation.getOutputStream().close(); 333 if (!leaveInTempFolder.booleanValue()) 334 { 335 File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName()); 336 File target = new File(getParentFile(parentName), fileChunk.getDestinationFile().getName()); 337 if (target.exists()) 338 if(!target.delete()) 339 logMessage("Could not delete target file " + target); 340 341 tempFile.setLastModified(fileChunk.lastModified()); 342 if (!localMove(tempFile,target)) 343 { 344 logMessage("Could not move " + tempFile + " to " + target); 345 } 346 } 347 } 348 } catch (IOException e) 349 { 350 logException(e); 351 } 352 } 353 354 360 public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers) 361 { 362 367 if (mPushsInProcess.size() > 0) 368 { 369 synchronized(mPushsInProcess) 370 { 371 Collection values = mPushsInProcess.values(); 372 Iterator iter = values.iterator(); 373 while (iter.hasNext()) 374 { 375 FilePushOperation push = (FilePushOperation)iter.next(); 376 if (deadMembers.contains(push.getOriginatingNode())) 377 { 378 push.cancel(); 380 iter.remove(); 381 } 382 } 383 } 384 } 385 386 if (mPullsInProcess.size() > 0) 387 { 388 synchronized(mPullsInProcess) 389 { 390 Collection values = mPullsInProcess.values(); 391 Iterator iter = values.iterator(); 392 while(iter.hasNext()) 393 { 394 FilePullOperation pull = (FilePullOperation)iter.next(); 395 if (deadMembers.contains(pull.getFileChunk().getOriginatingNode())) 396 { 397 pull.cancel(); 399 iter.remove(); 400 } 401 } 402 } 403 } 404 } 405 406 private static File getServerTempDir() 407 { 408 return TEMP_DIRECTORY; 409 } 410 411 private File getParentFile(String parentName) 412 { 413 return (File) mParentFolders.get(parentName); 414 } 415 416 private String CompositeKey(String originNodeName, String fileName) 417 { 418 return originNodeName + "#" + fileName; 419 } 420 421 private static void logMessage(String message) 422 { 423 log.info(message); 424 } 425 426 private static void logException(Throwable e) 427 { 428 log.error(e); 430 } 431 432 433 436 private static class FilePushOperation { 437 438 439 public FilePushOperation(String originNodeName, ClusterNode originNode) 440 { 441 mOriginNodeName =originNodeName; 442 mOriginNode = originNode; 443 } 444 445 public void openOutputFile(File file) throws FileNotFoundException 446 { 447 mOutput = new FileOutputStream(file); 448 mOutputFile = file; 449 } 450 451 455 public void cancel() 456 { 457 ClusterFileTransfer.logMessage("Canceling receive of file " + mOutputFile + " as remote server "+mOriginNodeName+" left the cluster. Partial results will be deleted."); 458 try 459 { 460 mOutput.close(); 462 if(!mOutputFile.delete()) 463 logMessage("Could not delete output file " + mOutputFile); 464 } 465 catch(IOException e) { logException(e); } 466 } 467 468 472 public ClusterNode getOriginatingNode() 473 { 474 return mOriginNode; 475 } 476 477 public OutputStream getOutputStream() 478 { 479 return mOutput; 480 } 481 482 private OutputStream mOutput; 483 private String mOriginNodeName; 484 private ClusterNode mOriginNode; 485 private File mOutputFile; 486 } 487 488 491 private static class FilePullOperation { 492 public FilePullOperation(FileContentChunk fileChunk) 493 { 494 mFileChunk = fileChunk; 495 } 496 497 public void openInputFile() throws FileNotFoundException 498 { 499 mInput = mFileChunk.openInputFile(); 500 } 501 502 public InputStream getInputStream() 503 { 504 return mInput; 505 } 506 507 510 public void cancel() 511 { 512 logMessage("Canceling send of file " + mFileChunk.getDestinationFile() + " as remote server "+mFileChunk.getOriginatingNodeName()+" left the cluster."); 513 try 514 { 515 mInput.close(); 516 } 517 catch(IOException e) { logException(e); } 518 } 519 520 public FileContentChunk getFileChunk() 521 { 522 return mFileChunk; 523 } 524 525 private FileContentChunk mFileChunk; 526 private InputStream mInput; 527 } 528 529 533 private static class FileContentChunk implements Serializable { 534 535 public FileContentChunk(File file, String originNodeName, ClusterNode originNode) 536 { 537 this.mDestinationFile = file; 538 this.mLastModified = file.lastModified(); 539 this.mOriginNode = originNode; 540 this.mOriginNodeName = originNodeName; 541 mChunkNumber = 0; 542 long size = file.length(); 543 if (size > MAX_CHUNK_BUFFER_SIZE) 544 size = MAX_CHUNK_BUFFER_SIZE; 545 mChunk = new byte[(int) size]; mByteCount = 0; 547 } 548 549 554 public String getOriginatingNodeName() 555 { 556 return this.mOriginNodeName; 557 } 558 559 563 public ClusterNode getOriginatingNode() 564 { 565 return mOriginNode; 566 } 567 568 public File getDestinationFile() 569 { 570 return this.mDestinationFile; 571 } 572 573 578 public InputStream openInputFile() throws FileNotFoundException 579 { 580 return new FileInputStream(this.mDestinationFile); 581 } 582 583 589 public OutputStream openOutputFile() throws FileNotFoundException 590 { 591 File lFile = new File(ClusterFileTransfer.getServerTempDir(), this.mDestinationFile.getName()); 592 FileOutputStream output = new FileOutputStream(lFile); 593 return output; 594 } 595 596 600 public int readNext(InputStream input) throws IOException 601 { 602 this.mChunkNumber++; 603 this.mByteCount = input.read(this.mChunk); 604 return this.mByteCount; 605 } 606 607 public long lastModified() 608 { 609 return mLastModified; 610 } 611 612 static final long serialVersionUID = 3546447481674749363L; 613 private File mDestinationFile; 614 private long mLastModified; 615 private String mOriginNodeName; 616 private ClusterNode mOriginNode; 617 private int mChunkNumber; 618 private static final int FIRST_CHUNK = 1; 619 private byte[] mChunk; 620 private int mByteCount; 621 } 622 623 public static boolean localMove(File source, File destination) throws FileNotFoundException, IOException { 624 if(source.renameTo(destination)) return true; OutputStream out = new FileOutputStream(destination); 628 InputStream in = new FileInputStream(source); 629 byte buffer[] = new byte[32*1024]; 630 int bytesRead = 0; 631 while(bytesRead > -1) { bytesRead = in.read(buffer); 633 if(bytesRead > 0) { 634 out.write(buffer,0, bytesRead); 635 } 636 } 637 in.close(); 638 out.close(); 639 if(!source.delete()) 640 logMessage("Could not delete file "+ source); 641 return true; 642 } 643 644 647 public static class ClusterFileTransferException extends Exception 648 { 649 public ClusterFileTransferException(String message) 650 { 651 super(message); 652 } 653 654 public ClusterFileTransferException(String message, Throwable cause) 655 { 656 super(message, cause); 657 } 658 659 public ClusterFileTransferException(Throwable cause) 660 { 661 super(cause); 662 } 663 } 664 } 665 | Popular Tags |