KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ha > framework > server > ClusterFileTransfer


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.ha.framework.server;
23
24 import org.jboss.ha.framework.interfaces.HAPartition;
25 import org.jboss.ha.framework.interfaces.ClusterNode;
26 import org.jboss.system.server.ServerConfigLocator;
27 import org.jboss.logging.Logger;
28 import org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListener;
29
30 import java.util.*;
31 import java.io.*;
32
33 /**
34  * Handles transfering files on the cluster. Files are sent in small chunks at a time (up to MAX_CHUNK_BUFFER_SIZE bytes per
35  * Cluster call).
36  *
37  * @author <a HREF="mailto:smarlow@novell.com">Scott Marlow</a>.
38  * @version $Revision: 56893 $
39  */

40 public class ClusterFileTransfer implements AsynchHAMembershipListener {
41
42    // Specify max file transfer buffer size that we read and write at a time.
43
// This influences the number of times that we will invoke disk read/write file
44
// operations versus how much memory we will consume for a file transfer.
45
private static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
46
47    // collection of in-progress file push operations
48
private Map mPushsInProcess = Collections.synchronizedMap(new HashMap());
49
50    // collection of in-progress file pull operations
51
private Map mPullsInProcess = Collections.synchronizedMap(new HashMap());
52
53    private HAPartition mPartition;
54
55    private static final File TEMP_DIRECTORY = ServerConfigLocator.locate().getServerTempDir();
56
57    // Mapping between parent folder name and target destination folder
58
// the search key is the parent folder name and value is the java.io.File.
59
// We don't synchronize on the mParentFolders as we assume its safe to read it.
60
private Map mParentFolders = null;
61
62    private static final String JavaDoc SERVICE_NAME = ClusterFileTransfer.class.getName() + "Service";
63
64    private static final Logger log = Logger.getLogger(ClusterFileTransfer.class.getName());
65
66    /**
67     * Constructor needs the cluster partition and the mapping of server folder names to the java.io.File instance
68     * representing the physical folder.
69     *
70     * @param partition represents the cluster.
71     * @param destinationDirectoryMap is the mapping between server folder name and physical folder representation.
72     */

73    public ClusterFileTransfer(HAPartition partition, Map destinationDirectoryMap)
74    {
75       this.mPartition = partition;
76       this.mPartition.registerRPCHandler(SERVICE_NAME, this);
77       this.mPartition.registerMembershipListener(this);
78       mParentFolders = destinationDirectoryMap;
79    }
80
81    /**
82     * Get specified file from the cluster.
83     *
84     * @param file identifies the file to get from the cluster.
85     * @param parentName is the parent folder name for the file on both source and destination nodes.
86     * @throws ClusterFileTransferException
87     */

88    public void pull(File file, String JavaDoc parentName) throws ClusterFileTransferException
89    {
90       String JavaDoc myNodeName = this.mPartition.getNodeName();
91       ClusterNode myNodeAddress = this.mPartition.getClusterNode();
92       FileOutputStream output = null;
93       try
94       {
95          log.info("Start pull of file " + file.getName() + " from cluster.");
96          ArrayList response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME,
97             "remotePullOpenFile",
98             new Object JavaDoc[]{file, myNodeName, myNodeAddress, parentName}, new Class JavaDoc[]{java.io.File JavaDoc.class, java.lang.String JavaDoc.class,ClusterNode.class, java.lang.String JavaDoc.class},
99             true);
100
101          if (response == null || response.size() < 1)
102          {
103             throw new ClusterFileTransferException("Did not receive response from remote machine trying to open file '" + file + "'. Check remote machine error log.");
104          }
105
106          FileContentChunk fileChunk = (FileContentChunk) response.get(0);
107          if(null == fileChunk)
108          {
109             throw new ClusterFileTransferException("An error occured on remote machine trying to open file '" + file + "'. Check remote machine error log.");
110          }
111
112          File tempFile = new File(ClusterFileTransfer.getServerTempDir(), file.getName());
113          output = new FileOutputStream(tempFile);
114
115          // get the remote file modification time and change our local copy to have the same time.
116
long lastModification = fileChunk.lastModified();
117          while (fileChunk.mByteCount > 0)
118          {
119             output.write(fileChunk.mChunk, 0, fileChunk.mByteCount);
120             response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME,
121                "remotePullReadFile",
122                new Object JavaDoc[]{file, myNodeName}, new Class JavaDoc[]{java.io.File JavaDoc.class, java.lang.String JavaDoc.class},
123                true);
124             if (response.size() < 1)
125             {
126                if(! tempFile.delete())
127                   throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Is remote still running? Also, we couldn't delete temp file "+ tempFile.getName());
128                throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Is remote still running?");
129             }
130             fileChunk = (FileContentChunk) response.get(0);
131             if (null == fileChunk)
132             {
133                if( !tempFile.delete())
134                   throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Check remote machine error log. Also, we couldn't delete temp file "+ tempFile.getName());
135                throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Check remote machine error log.");
136             }
137          }
138          output.close();
139          output = null;
140          File target = new File(getParentFile(parentName), file.getName());
141          if (target.exists()) {
142             if(!target.delete())
143                throw new ClusterFileTransferException("The destination file "+ target + " couldn't be deleted, the updated application will not be copied to this node");
144
145          }
146          tempFile.setLastModified(lastModification);
147          if (!localMove(tempFile,target))
148          {
149             throw new ClusterFileTransferException("Could not move " + tempFile + " to " + target);
150          }
151          log.info("Finished cluster pull of file " + file.getName() + " to "+ target.getName());
152       }
153       catch(IOException e)
154       {
155          throw new ClusterFileTransferException(e);
156       }
157       catch(ClusterFileTransferException e)
158       {
159          throw e;
160       }
161       catch(Exception JavaDoc e)
162       {
163          throw new ClusterFileTransferException(e);
164       }
165       finally {
166          if( output != null) {
167             try {
168                output.close();
169             }
170             catch(IOException e) {logException(e);} // we are already in the middle of a throw if output isn't null.
171
}
172       }
173    }
174
175    /**
176     * This is remotely called by {@link #pull(File , String )} to open the file on the machine that
177     * the file is being copied from.
178     *
179     * @param file is the file to pull.
180     * @param originNodeName is the cluster node that is requesting the file.
181     * @param parentName is the parent folder name for the file on both source and destination nodes.
182     * @return FileContentChunk containing the first part of the file read after opening it.
183     */

184    public FileContentChunk remotePullOpenFile(File file, String JavaDoc originNodeName, ClusterNode originNode, String JavaDoc parentName)
185    {
186       try
187       {
188          File target = new File(getParentFile(parentName), file.getName());
189          FileContentChunk fileChunk = new FileContentChunk(target, originNodeName,originNode);
190          FilePullOperation filePullOperation = new FilePullOperation(fileChunk);
191          // save the operation for the next call to remoteReadFile
192
this.mPullsInProcess.put(CompositeKey(originNodeName, file.getName()), filePullOperation);
193          filePullOperation.openInputFile();
194          fileChunk.readNext(filePullOperation.getInputStream());
195          return fileChunk;
196       } catch (IOException e)
197       {
198          logException(e);
199       } catch (Exception JavaDoc e)
200       {
201          logException(e);
202       }
203       return null;
204    }
205
206    /**
207     * This is remotely called by {@link #pull(File, String )} to read the file on the machine that the file is being
208     * copied from.
209     *
210     * @param file is the file to pull.
211     * @param originNodeName is the cluster node that is requesting the file.
212     * @return FileContentChunk containing the next part of the file read.
213     */

214    public FileContentChunk remotePullReadFile(File file, String JavaDoc originNodeName)
215    {
216       try
217       {
218          FilePullOperation filePullOperation = (FilePullOperation) this.mPullsInProcess.get(CompositeKey(originNodeName, file.getName()));
219          filePullOperation.getFileChunk().readNext(filePullOperation.getInputStream());
220          if (filePullOperation.getFileChunk().mByteCount < 1)
221          {
222             // last call to read, so clean up
223
filePullOperation.getInputStream().close();
224             this.mPullsInProcess.remove(CompositeKey(originNodeName, file.getName()));
225          }
226          return filePullOperation.getFileChunk();
227       } catch (IOException e)
228       {
229          logException(e);
230       }
231       return null;
232    }
233
234    /**
235     * Send specified file to cluster.
236     *
237     * @param file is the file to send.
238     * @param leaveInTempFolder is true if the file should be left in the server temp folder.
239     * @throws ClusterFileTransferException
240     */

241    public void push(File file, String JavaDoc parentName, boolean leaveInTempFolder) throws ClusterFileTransferException
242    {
243       File target = new File(getParentFile(parentName), file.getName());
244
245       log.info("Start push of file " + file.getName() + " to cluster.");
246       // check if trying to send explored archive (cannot send subdirectories)
247
if (target.isDirectory())
248       {
249          // let the user know why we are skipping this file and return.
250
logMessage("You cannot send the contents of directories, consider archiving folder containing" + target.getName() + " instead.");
251          return;
252       }
253       ClusterNode myNodeAddress = this.mPartition.getClusterNode();
254       FileContentChunk fileChunk = new FileContentChunk(target, this.mPartition.getNodeName(), myNodeAddress);
255       try
256       {
257          InputStream input = fileChunk.openInputFile();
258          while (fileChunk.readNext(input) >= 0)
259          {
260             mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushWriteFile", new Object JavaDoc[]{fileChunk, parentName}, new Class JavaDoc[]{fileChunk.getClass(), java.lang.String JavaDoc.class}, true);
261          }
262          // tell remote(s) to close the output file
263
mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushCloseFile", new Object JavaDoc[]{fileChunk, new Boolean JavaDoc(leaveInTempFolder), parentName}, new Class JavaDoc[]{fileChunk.getClass(), Boolean JavaDoc.class, java.lang.String JavaDoc.class}, true);
264          input.close();
265          log.info("Finished push of file " + file.getName() + " to cluster.");
266       }
267       catch(FileNotFoundException e)
268       {
269          throw new ClusterFileTransferException(e);
270       }
271       catch(IOException e)
272       {
273          throw new ClusterFileTransferException(e);
274       }
275       catch(Exception JavaDoc e)
276       {
277          throw new ClusterFileTransferException(e);
278       }
279    }
280
281
282    /**
283     * Remote method for writing file a fragment at a time.
284     *
285     * @param fileChunk
286     */

287    public void remotePushWriteFile(FileContentChunk fileChunk, String JavaDoc parentName)
288    {
289       try
290       {
291          String JavaDoc key = CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName());
292          FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.get(key);
293
294          // handle first call to write
295
if (filePushOperation == null)
296          {
297             if (fileChunk.mChunkNumber != FileContentChunk.FIRST_CHUNK)
298             {
299                // we joined the cluster after the file transfer started
300
logMessage("Ignoring file transfer of '" + fileChunk.getDestinationFile().getName() + "' from " + fileChunk.getOriginatingNodeName() + ", we missed the start of it.");
301                return;
302             }
303             filePushOperation = new FilePushOperation(fileChunk.getOriginatingNodeName(), fileChunk.getOriginatingNode());
304             File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName());
305             filePushOperation.openOutputFile(tempFile);
306             mPushsInProcess.put(key, filePushOperation);
307          }
308          filePushOperation.getOutputStream().write(fileChunk.mChunk, 0, fileChunk.mByteCount);
309       } catch (FileNotFoundException e)
310       {
311          logException(e);
312       } catch (IOException e)
313       {
314          logException(e);
315       }
316    }
317
318    /**
319     * Remote method for closing the file just transmitted.
320     *
321     * @param fileChunk
322     * @param leaveInTempFolder is true if we should leave the file in the server temp folder
323     */

324    public void remotePushCloseFile(FileContentChunk fileChunk, Boolean JavaDoc leaveInTempFolder, String JavaDoc parentName)
325    {
326       try
327       {
328          FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.remove(CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName()));
329
330          if ((filePushOperation != null) && (filePushOperation.getOutputStream() != null))
331          {
332             filePushOperation.getOutputStream().close();
333             if (!leaveInTempFolder.booleanValue())
334             {
335                File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName());
336                File target = new File(getParentFile(parentName), fileChunk.getDestinationFile().getName());
337                if (target.exists())
338                   if(!target.delete())
339                      logMessage("Could not delete target file " + target);
340
341                tempFile.setLastModified(fileChunk.lastModified());
342                if (!localMove(tempFile,target))
343                {
344                   logMessage("Could not move " + tempFile + " to " + target);
345                }
346             }
347          }
348       } catch (IOException e)
349       {
350          logException(e);
351       }
352    }
353
354    /** Called when a new partition topology occurs. see HAPartition.AsynchHAMembershipListener
355     *
356     * @param deadMembers A list of nodes that have died since the previous view
357     * @param newMembers A list of nodes that have joined the partition since the previous view
358     * @param allMembers A list of nodes that built the current view
359     */

360    public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
361    {
362       // Are there any deadMembers contained in mPushsInProcess or in mPullsInProcess.
363
// If so, cancel operations for them.
364
// If contained in mPushsInProcess, then we can stop waiting for the rest of the file transfer.
365
// If contained in mPullsInProcess, then we can stop supplying for the rest of the file transfer.
366

367       if (mPushsInProcess.size() > 0)
368       {
369          synchronized(mPushsInProcess)
370          {
371             Collection values = mPushsInProcess.values();
372             Iterator iter = values.iterator();
373             while (iter.hasNext())
374             {
375                FilePushOperation push = (FilePushOperation)iter.next();
376                if (deadMembers.contains(push.getOriginatingNode()))
377                {
378                   // cancel the operation and remove the operation from mPushsInProcess
379
push.cancel();
380                   iter.remove();
381                }
382             }
383          }
384       }
385
386       if (mPullsInProcess.size() > 0)
387       {
388          synchronized(mPullsInProcess)
389          {
390             Collection values = mPullsInProcess.values();
391             Iterator iter = values.iterator();
392             while(iter.hasNext())
393             {
394                FilePullOperation pull = (FilePullOperation)iter.next();
395                if (deadMembers.contains(pull.getFileChunk().getOriginatingNode()))
396                {
397                   // cancel the operation and remove the operation from mPullsInProcess
398
pull.cancel();
399                   iter.remove();
400                }
401             }
402          }
403       }
404    }
405
406    private static File getServerTempDir()
407    {
408       return TEMP_DIRECTORY;
409    }
410
411    private File getParentFile(String JavaDoc parentName)
412    {
413       return (File) mParentFolders.get(parentName);
414    }
415
416    private String JavaDoc CompositeKey(String JavaDoc originNodeName, String JavaDoc fileName)
417    {
418       return originNodeName + "#" + fileName;
419    }
420
421    private static void logMessage(String JavaDoc message)
422    {
423       log.info(message);
424    }
425
426    private static void logException(Throwable JavaDoc e)
427    {
428       //e.printStackTrace();
429
log.error(e);
430    }
431
432
433    /**
434     * Represents file push operation.
435     */

436    private static class FilePushOperation {
437
438
439       public FilePushOperation(String JavaDoc originNodeName, ClusterNode originNode)
440       {
441          mOriginNodeName =originNodeName;
442          mOriginNode = originNode;
443       }
444
445       public void openOutputFile(File file) throws FileNotFoundException
446       {
447          mOutput = new FileOutputStream(file);
448          mOutputFile = file;
449       }
450
451       /**
452        * Cancel the file push operation. To be called locally on each machine that is
453        * receiving the file.
454        */

455       public void cancel()
456       {
457          ClusterFileTransfer.logMessage("Canceling receive of file " + mOutputFile + " as remote server "+mOriginNodeName+" left the cluster. Partial results will be deleted.");
458          try
459          {
460             // close the output stream and delete the file.
461
mOutput.close();
462             if(!mOutputFile.delete())
463                logMessage("Could not delete output file " + mOutputFile);
464          }
465          catch(IOException e) { logException(e); }
466       }
467
468       /**
469        * Get the IPAddress of the cluster node that is pushing file to the cluster.
470        * @return IPAddress
471        */

472       public ClusterNode getOriginatingNode()
473       {
474          return mOriginNode;
475       }
476
477       public OutputStream getOutputStream()
478       {
479          return mOutput;
480       }
481
482       private OutputStream mOutput;
483       private String JavaDoc mOriginNodeName;
484       private ClusterNode mOriginNode;
485       private File mOutputFile;
486    }
487
488    /**
489     * Represents file pull operation.
490     */

491    private static class FilePullOperation {
492       public FilePullOperation(FileContentChunk fileChunk)
493       {
494          mFileChunk = fileChunk;
495       }
496
497       public void openInputFile() throws FileNotFoundException
498       {
499          mInput = mFileChunk.openInputFile();
500       }
501
502       public InputStream getInputStream()
503       {
504          return mInput;
505       }
506
507       /**
508        * Cancel the file pull operation. To be called locally on the machine that is supplying the file.
509        */

510       public void cancel()
511       {
512          logMessage("Canceling send of file " + mFileChunk.getDestinationFile() + " as remote server "+mFileChunk.getOriginatingNodeName()+" left the cluster.");
513          try
514          {
515             mInput.close();
516          }
517          catch(IOException e) { logException(e); }
518       }
519
520       public FileContentChunk getFileChunk()
521       {
522          return mFileChunk;
523       }
524
525       private FileContentChunk mFileChunk;
526       private InputStream mInput;
527    }
528
529    /**
530     * For representing filetransfer state on the wire.
531     * The inputStream or OutputStream is expected to be maintained by the sender/receiver.
532     */

533    private static class FileContentChunk implements Serializable {
534
535       public FileContentChunk(File file, String JavaDoc originNodeName, ClusterNode originNode)
536       {
537          this.mDestinationFile = file;
538          this.mLastModified = file.lastModified();
539          this.mOriginNode = originNode;
540          this.mOriginNodeName = originNodeName;
541          mChunkNumber = 0;
542          long size = file.length();
543          if (size > MAX_CHUNK_BUFFER_SIZE)
544             size = MAX_CHUNK_BUFFER_SIZE;
545          mChunk = new byte[(int) size]; // set amount transferred at a time
546
mByteCount = 0;
547       }
548
549       /**
550        * Get the name of the cluster node that started the file transfer operation
551        *
552        * @return node name
553        */

554       public String JavaDoc getOriginatingNodeName()
555       {
556          return this.mOriginNodeName;
557       }
558
559       /**
560        * Get the address of the cluster node that started the file transfer operation.
561        * @return ClusterNode
562        */

563       public ClusterNode getOriginatingNode()
564       {
565          return mOriginNode;
566       }
567
568       public File getDestinationFile()
569       {
570          return this.mDestinationFile;
571       }
572
573       /**
574        * Open input file
575        *
576        * @throws FileNotFoundException
577        */

578       public InputStream openInputFile() throws FileNotFoundException
579       {
580          return new FileInputStream(this.mDestinationFile);
581       }
582
583       /**
584        * Open output file
585        *
586        * @return
587        * @throws FileNotFoundException
588        */

589       public OutputStream openOutputFile() throws FileNotFoundException
590       {
591          File lFile = new File(ClusterFileTransfer.getServerTempDir(), this.mDestinationFile.getName());
592          FileOutputStream output = new FileOutputStream(lFile);
593          return output;
594       }
595
596       /**
597        * @return number of bytes read
598        * @throws IOException
599        */

600       public int readNext(InputStream input) throws IOException
601       {
602          this.mChunkNumber++;
603          this.mByteCount = input.read(this.mChunk);
604          return this.mByteCount;
605       }
606
607       public long lastModified()
608       {
609          return mLastModified;
610       }
611
612       static final long serialVersionUID = 3546447481674749363L;
613       private File mDestinationFile;
614       private long mLastModified;
615       private String JavaDoc mOriginNodeName;
616       private ClusterNode mOriginNode;
617       private int mChunkNumber;
618       private static final int FIRST_CHUNK = 1;
619       private byte[] mChunk;
620       private int mByteCount;
621    }
622
623    public static boolean localMove(File source, File destination) throws FileNotFoundException, IOException {
624       if(source.renameTo(destination)) // if we can simply rename the file
625
return true; // return success
626
// otherwise, copy source to destination
627
OutputStream out = new FileOutputStream(destination);
628       InputStream in = new FileInputStream(source);
629       byte buffer[] = new byte[32*1024];
630       int bytesRead = 0;
631       while(bytesRead > -1) { // until we hit end of source file
632
bytesRead = in.read(buffer);
633          if(bytesRead > 0) {
634             out.write(buffer,0, bytesRead);
635          }
636       }
637       in.close();
638       out.close();
639       if(!source.delete())
640          logMessage("Could not delete file "+ source);
641       return true;
642    }
643
644    /**
645     * Exception wrapper class
646     */

647    public static class ClusterFileTransferException extends Exception JavaDoc
648    {
649       public ClusterFileTransferException(String JavaDoc message)
650       {
651          super(message);
652       }
653
654       public ClusterFileTransferException(String JavaDoc message, Throwable JavaDoc cause)
655       {
656          super(message, cause);
657       }
658
659       public ClusterFileTransferException(Throwable JavaDoc cause)
660       {
661          super(cause);
662       }
663    }
664 }
665
Popular Tags